kafka_protocol/
records.rs

1//! Provides utilities for working with records (Kafka messages).
2//!
3//! [`FetchResponse`](crate::messages::fetch_response::FetchResponse) and associated APIs for interacting with reading and writing
4//! contain records in a raw format, allowing the user to implement their own logic for interacting
5//! with those values.
6//!
7//! # Example
8//!
9//! Decoding a set of records from a [`FetchResponse`](crate::messages::fetch_response::FetchResponse):
10//! ```rust
11//! use kafka_protocol::messages::FetchResponse;
12//! use kafka_protocol::protocol::Decodable;
13//! use kafka_protocol::records::RecordBatchDecoder;
14//! use bytes::Bytes;
15//! use kafka_protocol::records::Compression;
16//!
17//! # const HEADER: [u8; 45] = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,];
18//! # const RECORD: [u8; 79] = [ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x43, 0x0, 0x0, 0x0, 0x0, 0x2, 0x73, 0x6d, 0x29, 0x7b, 0x0, 0b00000000, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x22, 0x1, 0xd0, 0xf, 0x2, 0xa, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0xa, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x0,];
19//! # let mut res = vec![];
20//! # res.extend_from_slice(&HEADER[..]);
21//! # res.extend_from_slice(&[0x00, 0x00, 0x00, 0x4f]);
22//! # res.extend_from_slice(&RECORD[..]);
23//! # let mut buf = Bytes::from(res);
24//!
25//! let res = FetchResponse::decode(&mut buf, 4).unwrap();
26//!
27//! for topic in res.responses {
28//!     for partition in topic.partitions {
29//!          let mut records = partition.records.unwrap();
30//!          let records = RecordBatchDecoder::decode_with_custom_compression(&mut records, Some(decompress_record_batch_data)).unwrap();
31//!     }
32//! }
33//!
34//! fn decompress_record_batch_data(compressed_buffer: &mut bytes::Bytes, compression: Compression) -> anyhow::Result<Bytes> {
35//!         match compression {
36//!             Compression::None => Ok(compressed_buffer.to_vec().into()),
37//!             _ => { panic!("Compression not implemented") }
38//!         }
39//!  }
40//! ```
41use anyhow::{anyhow, bail, Result};
42use bytes::{Bytes, BytesMut};
43use crc::{Crc, CRC_32_ISO_HDLC};
44use crc32c::crc32c;
45use indexmap::IndexMap;
46
47use crate::protocol::{
48    buf::{gap, ByteBuf, ByteBufMut},
49    types, Decoder, Encoder, StrBytes,
50};
51
52use super::compression::{self as cmpr, Compressor, Decompressor};
53use std::cmp::Ordering;
54use std::convert::TryFrom;
55/// IEEE (checksum) cyclic redundancy check.
56pub const IEEE: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);
57
58/// The different types of compression supported by Kafka.
59#[derive(Debug, Copy, Clone, PartialEq, Eq)]
60pub enum Compression {
61    /// No compression.
62    None = 0,
63    /// gzip compression library.
64    Gzip = 1,
65    /// Google's Snappy compression library.
66    Snappy = 2,
67    /// The LZ4 compression library.
68    Lz4 = 3,
69    /// Facebook's ZStandard compression library.
70    Zstd = 4,
71}
72
73/// Indicates the meaning of the timestamp field on a record.
74#[derive(Debug, Copy, Clone, PartialEq, Eq)]
75pub enum TimestampType {
76    /// The timestamp represents when the record was created by the client.
77    Creation = 0,
78    /// The timestamp represents when the record was appended to the log.
79    LogAppend = 1,
80}
81
82/// Options for encoding and compressing a batch of records. Note, not all compression algorithms
83/// are currently implemented by this library.
84pub struct RecordEncodeOptions {
85    /// Record version, 0, 1, or 2.
86    pub version: i8,
87
88    /// The compression algorithm to use.
89    pub compression: Compression,
90}
91
92/// Value to indicate missing producer id.
93pub const NO_PRODUCER_ID: i64 = -1;
94/// Value to indicate missing producer epoch.
95pub const NO_PRODUCER_EPOCH: i16 = -1;
96/// Value to indicated missing leader epoch.
97pub const NO_PARTITION_LEADER_EPOCH: i32 = -1;
98/// Value to indicate missing sequence id.
99pub const NO_SEQUENCE: i32 = -1;
100/// Value to indicate missing timestamp.
101pub const NO_TIMESTAMP: i64 = -1;
102
103#[derive(Debug, Clone)]
104/// Batch encoder for Kafka records.
105pub struct RecordBatchEncoder;
106
107#[derive(Debug, Clone)]
108/// Batch decoder for Kafka records.
109pub struct RecordBatchDecoder;
110
111struct BatchDecodeInfo {
112    record_count: usize,
113    timestamp_type: TimestampType,
114    min_offset: i64,
115    min_timestamp: i64,
116    base_sequence: i32,
117    transactional: bool,
118    control: bool,
119    partition_leader_epoch: i32,
120    producer_id: i64,
121    producer_epoch: i16,
122}
123
124/// Decoded records plus information about compression.
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub struct RecordSet {
127    /// Compression used for this set of records
128    pub compression: Compression,
129    /// Version used to encode the set of records
130    pub version: i8,
131    /// Records decoded in this set
132    pub records: Vec<Record>,
133}
134
135/// A Kafka message containing key, payload value, and all associated metadata.
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct Record {
138    // Batch properties
139    /// Whether this record is transactional.
140    pub transactional: bool,
141    /// Whether this record is a control message, which should not be exposed to the client.
142    pub control: bool,
143    /// Epoch of the leader for this record 's partition.
144    pub partition_leader_epoch: i32,
145    /// The identifier of the producer.
146    pub producer_id: i64,
147    /// Producer metadata used to implement transactional writes.
148    pub producer_epoch: i16,
149
150    // Record properties
151    /// Indicates whether timestamp represents record creation or appending to the log.
152    pub timestamp_type: TimestampType,
153    /// Message offset within a partition.
154    pub offset: i64,
155    /// Sequence identifier used for idempotent delivery.
156    pub sequence: i32,
157    /// Timestamp the record. See also `timestamp_type`.
158    pub timestamp: i64,
159    /// The key of the record.
160    pub key: Option<Bytes>,
161    /// The payload of the record.
162    pub value: Option<Bytes>,
163    /// Headers associated with the record's payload.
164    pub headers: IndexMap<StrBytes, Option<Bytes>>,
165}
166
167const MAGIC_BYTE_OFFSET: usize = 16;
168
169impl RecordBatchEncoder {
170    /// Encode records into given buffer, using provided encoding options that select the encoding
171    /// strategy based on version.
172    pub fn encode<'a, B, I>(buf: &mut B, records: I, options: &RecordEncodeOptions) -> Result<()>
173    where
174        B: ByteBufMut,
175        I: IntoIterator<Item = &'a Record>,
176        I::IntoIter: Clone,
177    {
178        Self::encode_with_custom_compression(
179            buf,
180            records,
181            options,
182            None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>,
183        )
184    }
185
186    /// Encode records into given buffer, using provided encoding options that select the encoding
187    /// strategy based on version.
188    /// # Arguments
189    /// * `compressor` - A function that compresses the given batch of records.
190    ///
191    /// If `None`, the right compression algorithm will automatically be selected and applied.
192    pub fn encode_with_custom_compression<'a, B, I, CF>(
193        buf: &mut B,
194        records: I,
195        options: &RecordEncodeOptions,
196        compressor: Option<CF>,
197    ) -> Result<()>
198    where
199        B: ByteBufMut,
200        I: IntoIterator<Item = &'a Record>,
201        I::IntoIter: Clone,
202        CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
203    {
204        let records = records.into_iter();
205        match options.version {
206            0..=1 => bail!("message sets v{} are unsupported", options.version),
207            2 => Self::encode_new(buf, records, options, compressor),
208            _ => bail!("Unknown record batch version"),
209        }
210    }
211
212    fn encode_new_records<'a, B, I>(
213        buf: &mut B,
214        records: I,
215        min_offset: i64,
216        min_timestamp: i64,
217        options: &RecordEncodeOptions,
218    ) -> Result<()>
219    where
220        B: ByteBufMut,
221        I: Iterator<Item = &'a Record>,
222    {
223        for record in records {
224            record.encode_new(buf, min_offset, min_timestamp, options)?;
225        }
226        Ok(())
227    }
228
229    fn encode_new_batch<'a, B, I, CF>(
230        buf: &mut B,
231        records: &mut I,
232        options: &RecordEncodeOptions,
233        compressor: Option<&CF>,
234    ) -> Result<bool>
235    where
236        B: ByteBufMut,
237        I: Iterator<Item = &'a Record> + Clone,
238        CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
239    {
240        let mut record_peeker = records.clone();
241
242        // Get first record
243        let first_record = match record_peeker.next() {
244            Some(record) => record,
245            None => return Ok(false),
246        };
247
248        // Determine how many additional records can be included in the batch
249        let num_records = record_peeker
250            .take_while(|record| {
251                record.transactional == first_record.transactional
252                    && record.control == first_record.control
253                    && record.partition_leader_epoch == first_record.partition_leader_epoch
254                    && record.producer_id == first_record.producer_id
255                    && record.producer_epoch == first_record.producer_epoch
256                    && (record.offset as i32).wrapping_sub(record.sequence)
257                        == (first_record.offset as i32).wrapping_sub(first_record.sequence)
258            })
259            .count()
260            + 1;
261
262        // Aggregate various record properties
263        let min_offset = records
264            .clone()
265            .take(num_records)
266            .map(|r| r.offset)
267            .min()
268            .expect("Batch contains at least one element");
269        let max_offset = records
270            .clone()
271            .take(num_records)
272            .map(|r| r.offset)
273            .max()
274            .expect("Batch contains at least one element");
275        let min_timestamp = records
276            .clone()
277            .take(num_records)
278            .map(|r| r.timestamp)
279            .min()
280            .expect("Batch contains at least one element");
281        let max_timestamp = records
282            .clone()
283            .take(num_records)
284            .map(|r| r.timestamp)
285            .max()
286            .expect("Batch contains at least one element");
287        let base_sequence = first_record
288            .sequence
289            .wrapping_sub((first_record.offset - min_offset) as i32);
290
291        // Base offset
292        types::Int64.encode(buf, min_offset)?;
293
294        // Batch length
295        let size_gap = buf.put_typed_gap(gap::I32);
296        let batch_start = buf.offset();
297
298        // Partition leader epoch
299        types::Int32.encode(buf, first_record.partition_leader_epoch)?;
300
301        // Magic byte
302        types::Int8.encode(buf, options.version)?;
303
304        // CRC
305        let crc_gap = buf.put_typed_gap(gap::U32);
306        let content_start = buf.offset();
307
308        // Attributes
309        let mut attributes = options.compression as i16;
310        if first_record.transactional {
311            attributes |= 1 << 4;
312        }
313        if first_record.control {
314            attributes |= 1 << 5;
315        }
316        types::Int16.encode(buf, attributes)?;
317
318        // Last offset delta
319        types::Int32.encode(buf, (max_offset - min_offset) as i32)?;
320
321        // First timestamp
322        types::Int64.encode(buf, min_timestamp)?;
323
324        // Last timestamp
325        types::Int64.encode(buf, max_timestamp)?;
326
327        // Producer ID
328        types::Int64.encode(buf, first_record.producer_id)?;
329
330        // Producer epoch
331        types::Int16.encode(buf, first_record.producer_epoch)?;
332
333        // Base sequence
334        types::Int32.encode(buf, base_sequence)?;
335
336        // Record count
337        if num_records > i32::MAX as usize {
338            bail!(
339                "Too many records to encode in one batch ({} records)",
340                num_records
341            );
342        }
343        types::Int32.encode(buf, num_records as i32)?;
344
345        // Records
346        let records = records.take(num_records);
347
348        if let Some(compressor) = compressor {
349            let mut record_buf = BytesMut::new();
350            Self::encode_new_records(&mut record_buf, records, min_offset, min_timestamp, options)?;
351            compressor(&mut record_buf, buf, options.compression)?;
352        } else {
353            match options.compression {
354                Compression::None => cmpr::None::compress(buf, |buf| {
355                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
356                })?,
357                #[cfg(feature = "snappy")]
358                Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
359                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
360                })?,
361                #[cfg(feature = "gzip")]
362                Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
363                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
364                })?,
365                #[cfg(feature = "lz4")]
366                Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
367                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
368                })?,
369                #[cfg(feature = "zstd")]
370                Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
371                    Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
372                })?,
373                #[allow(unreachable_patterns)]
374                c => {
375                    return Err(anyhow!(
376                        "Support for {c:?} is not enabled as a cargo feature"
377                    ))
378                }
379            }
380        }
381        let batch_end = buf.offset();
382
383        // Fill size gap
384        let batch_size = batch_end - batch_start;
385        if batch_size > i32::MAX as usize {
386            bail!(
387                "Record batch was too large to encode ({} bytes)",
388                batch_size
389            );
390        }
391
392        buf.fill_typed_gap(size_gap, batch_size as i32);
393
394        // Fill CRC gap
395        let crc = crc32c(buf.range(content_start..batch_end));
396        buf.fill_typed_gap(crc_gap, crc);
397
398        Ok(true)
399    }
400
401    fn encode_new<'a, B, I, CF>(
402        buf: &mut B,
403        mut records: I,
404        options: &RecordEncodeOptions,
405        compressor: Option<CF>,
406    ) -> Result<()>
407    where
408        B: ByteBufMut,
409        I: Iterator<Item = &'a Record> + Clone,
410        CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
411    {
412        while Self::encode_new_batch(buf, &mut records, options, compressor.as_ref())? {}
413        Ok(())
414    }
415}
416
417impl RecordBatchDecoder {
418    /// Decode one RecordSet from the provided buffer.
419    /// # Arguments
420    /// * `decompressor` - A function that decompresses the given batch of records.
421    ///
422    /// If `None`, the right decompression algorithm will automatically be selected and applied.
423    pub fn decode_with_custom_compression<B: ByteBuf, F>(
424        buf: &mut B,
425        decompressor: Option<F>,
426    ) -> Result<RecordSet>
427    where
428        F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
429    {
430        let mut records = Vec::new();
431        let (version, compression) =
432            Self::decode_into_vec(buf, &mut records, decompressor.as_ref())?;
433        Ok(RecordSet {
434            version,
435            compression,
436            records,
437        })
438    }
439
440    /// Decode the entire buffer into a vec of RecordSets.
441    pub fn decode_all<B: ByteBuf>(buf: &mut B) -> Result<Vec<RecordSet>> {
442        let mut batches = Vec::new();
443        while buf.has_remaining() {
444            batches.push(Self::decode(buf)?);
445        }
446        Ok(batches)
447    }
448
449    /// Decode one RecordSet from the provided buffer.
450    pub fn decode<B: ByteBuf>(buf: &mut B) -> Result<RecordSet> {
451        Self::decode_with_custom_compression(
452            buf,
453            None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>.as_ref(),
454        )
455    }
456
457    fn decode_into_vec<B: ByteBuf, F>(
458        buf: &mut B,
459        records: &mut Vec<Record>,
460        decompress_func: Option<&F>,
461    ) -> Result<(i8, Compression)>
462    where
463        F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
464    {
465        let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET + 1))?[0] as i8;
466        let compression = match version {
467            0..=1 => bail!("message sets v{} are unsupported", version),
468            2 => Self::decode_new_batch(buf, version, records, decompress_func),
469            _ => {
470                bail!("Unknown record batch version ({})", version);
471            }
472        }?;
473        Ok((version, compression))
474    }
475    fn decode_new_records<B: ByteBuf>(
476        buf: &mut B,
477        batch_decode_info: &BatchDecodeInfo,
478        version: i8,
479        records: &mut Vec<Record>,
480    ) -> Result<()> {
481        records.reserve(batch_decode_info.record_count);
482        for _ in 0..batch_decode_info.record_count {
483            records.push(Record::decode_new(buf, batch_decode_info, version)?);
484        }
485        Ok(())
486    }
487    fn decode_new_batch<B: ByteBuf, F>(
488        buf: &mut B,
489        version: i8,
490        records: &mut Vec<Record>,
491        decompress_func: Option<&F>,
492    ) -> Result<Compression>
493    where
494        F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
495    {
496        // Base offset
497        let min_offset = types::Int64.decode(buf)?;
498
499        // Batch length
500        let batch_length: i32 = types::Int32.decode(buf)?;
501        if batch_length < 0 {
502            bail!("Unexpected negative batch size: {}", batch_length);
503        }
504
505        // Convert buf to bytes
506        let buf = &mut buf.try_get_bytes(batch_length as usize)?;
507
508        // Partition leader epoch
509        let partition_leader_epoch = types::Int32.decode(buf)?;
510
511        // Magic byte
512        let magic: i8 = types::Int8.decode(buf)?;
513        if magic != version {
514            bail!("Version mismatch ({} != {})", magic, version);
515        }
516
517        // CRC
518        let supplied_crc: u32 = types::UInt32.decode(buf)?;
519        let actual_crc = crc32c(buf);
520
521        if supplied_crc != actual_crc {
522            bail!(
523                "Cyclic redundancy check failed ({} != {})",
524                supplied_crc,
525                actual_crc
526            );
527        }
528
529        // Attributes
530        let attributes: i16 = types::Int16.decode(buf)?;
531        let transactional = (attributes & (1 << 4)) != 0;
532        let control = (attributes & (1 << 5)) != 0;
533        let compression = match attributes & 0x7 {
534            0 => Compression::None,
535            1 => Compression::Gzip,
536            2 => Compression::Snappy,
537            3 => Compression::Lz4,
538            4 => Compression::Zstd,
539            other => {
540                bail!("Unknown compression algorithm used: {}", other);
541            }
542        };
543        let timestamp_type = if (attributes & (1 << 3)) != 0 {
544            TimestampType::LogAppend
545        } else {
546            TimestampType::Creation
547        };
548
549        // Last offset delta
550        let _max_offset_delta: i32 = types::Int32.decode(buf)?;
551
552        // First timestamp
553        let min_timestamp = types::Int64.decode(buf)?;
554
555        // Last timestamp
556        let _max_timestamp: i64 = types::Int64.decode(buf)?;
557
558        // Producer ID
559        let producer_id = types::Int64.decode(buf)?;
560
561        // Producer epoch
562        let producer_epoch = types::Int16.decode(buf)?;
563
564        // Base sequence
565        let base_sequence = types::Int32.decode(buf)?;
566
567        // Record count
568        let record_count: i32 = types::Int32.decode(buf)?;
569        if record_count < 0 {
570            bail!("Unexpected negative record count ({})", record_count);
571        }
572        let record_count = record_count as usize;
573
574        let batch_decode_info = BatchDecodeInfo {
575            record_count,
576            timestamp_type,
577            min_offset,
578            min_timestamp,
579            base_sequence,
580            transactional,
581            control,
582            partition_leader_epoch,
583            producer_id,
584            producer_epoch,
585        };
586
587        if let Some(decompress_func) = decompress_func {
588            let mut decompressed_buf = decompress_func(buf, compression)?;
589
590            Self::decode_new_records(&mut decompressed_buf, &batch_decode_info, version, records)?;
591        } else {
592            match compression {
593                Compression::None => cmpr::None::decompress(buf, |buf| {
594                    Self::decode_new_records(buf, &batch_decode_info, version, records)
595                })?,
596                #[cfg(feature = "snappy")]
597                Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| {
598                    Self::decode_new_records(buf, &batch_decode_info, version, records)
599                })?,
600                #[cfg(feature = "gzip")]
601                Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| {
602                    Self::decode_new_records(buf, &batch_decode_info, version, records)
603                })?,
604                #[cfg(feature = "zstd")]
605                Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| {
606                    Self::decode_new_records(buf, &batch_decode_info, version, records)
607                })?,
608                #[cfg(feature = "lz4")]
609                Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| {
610                    Self::decode_new_records(buf, &batch_decode_info, version, records)
611                })?,
612                #[allow(unreachable_patterns)]
613                c => {
614                    return Err(anyhow!(
615                        "Support for {c:?} is not enabled as a cargo feature"
616                    ))
617                }
618            };
619        }
620
621        Ok(compression)
622    }
623}
624
625impl Record {
626    fn encode_new<B: ByteBufMut>(
627        &self,
628        buf: &mut B,
629        min_offset: i64,
630        min_timestamp: i64,
631        options: &RecordEncodeOptions,
632    ) -> Result<()> {
633        // Size
634        let size = self.compute_size_new(min_offset, min_timestamp, options)?;
635        if size > i32::MAX as usize {
636            bail!("Record was too large to encode ({} bytes)", size);
637        }
638        types::VarInt.encode(buf, size as i32)?;
639
640        // Attributes
641        types::Int8.encode(buf, 0)?;
642
643        // Timestamp delta
644        let timestamp_delta = self.timestamp - min_timestamp;
645        if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
646            bail!(
647                "Timestamps within batch are too far apart ({}, {})",
648                min_timestamp,
649                self.timestamp
650            );
651        }
652        types::VarInt.encode(buf, timestamp_delta as i32)?;
653
654        // Offset delta
655        let offset_delta = self.offset - min_offset;
656        if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
657            bail!(
658                "Timestamps within batch are too far apart ({}, {})",
659                min_offset,
660                self.offset
661            );
662        }
663        types::VarInt.encode(buf, offset_delta as i32)?;
664
665        // Key
666        if let Some(k) = self.key.as_ref() {
667            if k.len() > i32::MAX as usize {
668                bail!("Record key was too large to encode ({} bytes)", k.len());
669            }
670            types::VarInt.encode(buf, k.len() as i32)?;
671            buf.put_slice(k);
672        } else {
673            types::VarInt.encode(buf, -1)?;
674        }
675
676        // Value
677        if let Some(v) = self.value.as_ref() {
678            if v.len() > i32::MAX as usize {
679                bail!("Record value was too large to encode ({} bytes)", v.len());
680            }
681            types::VarInt.encode(buf, v.len() as i32)?;
682            buf.put_slice(v);
683        } else {
684            types::VarInt.encode(buf, -1)?;
685        }
686
687        // Headers
688        if self.headers.len() > i32::MAX as usize {
689            bail!("Too many record headers encode ({})", self.headers.len());
690        }
691        types::VarInt.encode(buf, self.headers.len() as i32)?;
692        for (k, v) in &self.headers {
693            // Key len
694            if k.len() > i32::MAX as usize {
695                bail!(
696                    "Record header key was too large to encode ({} bytes)",
697                    k.len()
698                );
699            }
700            types::VarInt.encode(buf, k.len() as i32)?;
701
702            // Key
703            buf.put_slice(k.as_ref());
704
705            // Value
706            if let Some(v) = v.as_ref() {
707                if v.len() > i32::MAX as usize {
708                    bail!(
709                        "Record header value was too large to encode ({} bytes)",
710                        v.len()
711                    );
712                }
713                types::VarInt.encode(buf, v.len() as i32)?;
714                buf.put_slice(v);
715            } else {
716                types::VarInt.encode(buf, -1)?;
717            }
718        }
719
720        Ok(())
721    }
722    fn compute_size_new(
723        &self,
724        min_offset: i64,
725        min_timestamp: i64,
726        _options: &RecordEncodeOptions,
727    ) -> Result<usize> {
728        let mut total_size = 0;
729
730        // Attributes
731        total_size += types::Int8.compute_size(0)?;
732
733        // Timestamp delta
734        let timestamp_delta = self.timestamp - min_timestamp;
735        if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 {
736            bail!(
737                "Timestamps within batch are too far apart ({}, {})",
738                min_timestamp,
739                self.timestamp
740            );
741        }
742        total_size += types::VarInt.compute_size(timestamp_delta as i32)?;
743
744        // Offset delta
745        let offset_delta = self.offset - min_offset;
746        if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 {
747            bail!(
748                "Timestamps within batch are too far apart ({}, {})",
749                min_offset,
750                self.offset
751            );
752        }
753        total_size += types::VarInt.compute_size(offset_delta as i32)?;
754
755        // Key
756        if let Some(k) = self.key.as_ref() {
757            if k.len() > i32::MAX as usize {
758                bail!("Record key was too large to encode ({} bytes)", k.len());
759            }
760            total_size += types::VarInt.compute_size(k.len() as i32)?;
761            total_size += k.len();
762        } else {
763            total_size += types::VarInt.compute_size(-1)?;
764        }
765
766        // Value len
767        if let Some(v) = self.value.as_ref() {
768            if v.len() > i32::MAX as usize {
769                bail!("Record value was too large to encode ({} bytes)", v.len());
770            }
771            total_size += types::VarInt.compute_size(v.len() as i32)?;
772            total_size += v.len();
773        } else {
774            total_size += types::VarInt.compute_size(-1)?;
775        }
776
777        // Headers
778        if self.headers.len() > i32::MAX as usize {
779            bail!("Too many record headers encode ({})", self.headers.len());
780        }
781        total_size += types::VarInt.compute_size(self.headers.len() as i32)?;
782        for (k, v) in &self.headers {
783            // Key len
784            if k.len() > i32::MAX as usize {
785                bail!(
786                    "Record header key was too large to encode ({} bytes)",
787                    k.len()
788                );
789            }
790            total_size += types::VarInt.compute_size(k.len() as i32)?;
791
792            // Key
793            total_size += k.len();
794
795            // Value
796            if let Some(v) = v.as_ref() {
797                if v.len() > i32::MAX as usize {
798                    bail!(
799                        "Record header value was too large to encode ({} bytes)",
800                        v.len()
801                    );
802                }
803                total_size += types::VarInt.compute_size(v.len() as i32)?;
804                total_size += v.len();
805            } else {
806                total_size += types::VarInt.compute_size(-1)?;
807            }
808        }
809
810        Ok(total_size)
811    }
812    fn decode_new<B: ByteBuf>(
813        buf: &mut B,
814        batch_decode_info: &BatchDecodeInfo,
815        _version: i8,
816    ) -> Result<Self> {
817        // Size
818        let size: i32 = types::VarInt.decode(buf)?;
819        if size < 0 {
820            bail!("Unexpected negative record size: {}", size);
821        }
822
823        // Ensure we don't over-read
824        let buf = &mut buf.try_get_bytes(size as usize)?;
825
826        // Attributes
827        let _attributes: i8 = types::Int8.decode(buf)?;
828
829        // Timestamp delta
830        let timestamp_delta: i32 = types::VarInt.decode(buf)?;
831        let timestamp = batch_decode_info.min_timestamp + timestamp_delta as i64;
832
833        // Offset delta
834        let offset_delta: i32 = types::VarInt.decode(buf)?;
835        let offset = batch_decode_info.min_offset + offset_delta as i64;
836        let sequence = batch_decode_info.base_sequence.wrapping_add(offset_delta);
837
838        // Key
839        let key_len: i32 = types::VarInt.decode(buf)?;
840        let key = match key_len.cmp(&-1) {
841            Ordering::Less => {
842                bail!("Unexpected negative record key length ({} bytes)", key_len);
843            }
844            Ordering::Equal => None,
845            Ordering::Greater => Some(buf.try_get_bytes(key_len as usize)?),
846        };
847
848        // Value
849        let value_len: i32 = types::VarInt.decode(buf)?;
850        let value = match value_len.cmp(&-1) {
851            Ordering::Less => {
852                bail!(
853                    "Unexpected negative record value length ({} bytes)",
854                    value_len
855                );
856            }
857            Ordering::Equal => None,
858            Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
859        };
860
861        // Headers
862        let num_headers: i32 = types::VarInt.decode(buf)?;
863        if num_headers < 0 {
864            bail!("Unexpected negative record header count: {}", num_headers);
865        }
866        let num_headers = num_headers as usize;
867
868        let mut headers = IndexMap::with_capacity(num_headers);
869        for _ in 0..num_headers {
870            // Key len
871            let key_len: i32 = types::VarInt.decode(buf)?;
872            if key_len < 0 {
873                bail!(
874                    "Unexpected negative record header key length ({} bytes)",
875                    key_len
876                );
877            }
878
879            // Key
880            let key = StrBytes::try_from(buf.try_get_bytes(key_len as usize)?)?;
881
882            // Key len
883            let value_len: i32 = types::VarInt.decode(buf)?;
884
885            // Value
886            let value = match value_len.cmp(&-1) {
887                Ordering::Less => {
888                    bail!(
889                        "Unexpected negative record header value length ({} bytes)",
890                        value_len
891                    );
892                }
893                Ordering::Equal => None,
894                Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?),
895            };
896
897            headers.insert(key, value);
898        }
899
900        Ok(Self {
901            transactional: batch_decode_info.transactional,
902            control: batch_decode_info.control,
903            timestamp_type: batch_decode_info.timestamp_type,
904            partition_leader_epoch: batch_decode_info.partition_leader_epoch,
905            producer_id: batch_decode_info.producer_id,
906            producer_epoch: batch_decode_info.producer_epoch,
907            sequence,
908            offset,
909            timestamp,
910            key,
911            value,
912            headers,
913        })
914    }
915}
916
917#[cfg(test)]
918mod tests {
919    use bytes::Bytes;
920
921    use super::*;
922
923    #[test]
924    fn lookup_header_via_u8_slice() {
925        let record = Record {
926            transactional: false,
927            control: false,
928            partition_leader_epoch: 0,
929            producer_id: 0,
930            producer_epoch: 0,
931            sequence: 0,
932            timestamp_type: TimestampType::Creation,
933            offset: Default::default(),
934            timestamp: Default::default(),
935            key: Default::default(),
936            value: Default::default(),
937            headers: [
938                ("some-key".into(), Some("some-value".into())),
939                ("other-header".into(), None),
940            ]
941            .into(),
942        };
943        assert_eq!(
944            Bytes::from("some-value"),
945            record
946                .headers
947                // This relies on `impl Borrow<[u8]> for StrBytes`
948                .get("some-key".as_bytes())
949                .expect("key exists in headers")
950                .as_ref()
951                .expect("value is present")
952        );
953    }
954
955    #[test]
956    fn decode_record_header_no_value() {
957        let record = Record {
958            transactional: false,
959            control: false,
960            partition_leader_epoch: 0,
961            producer_id: 0,
962            producer_epoch: 0,
963            sequence: 0,
964            timestamp_type: TimestampType::Creation,
965            offset: Default::default(),
966            timestamp: Default::default(),
967            key: Default::default(),
968            value: Default::default(),
969            headers: [("other-header".into(), None)].into(),
970        };
971        let mut buf = &mut bytes::BytesMut::new();
972        record
973            .encode_new(
974                buf,
975                0,
976                0,
977                &RecordEncodeOptions {
978                    version: 2,
979                    compression: super::Compression::None,
980                },
981            )
982            .expect("encode works");
983
984        Record::decode_new(
985            &mut buf,
986            &BatchDecodeInfo {
987                record_count: 1,
988                timestamp_type: TimestampType::Creation,
989                min_offset: 0,
990                min_timestamp: 0,
991                base_sequence: 0,
992                transactional: false,
993                control: false,
994                partition_leader_epoch: 0,
995                producer_id: 0,
996                producer_epoch: 0,
997            },
998            2,
999        )
1000        .expect("decode works");
1001    }
1002}