fluvio_dataplane_protocol/
batch.rs

1use std::io::Error;
2use std::mem::size_of;
3use std::fmt::Debug;
4use bytes::Bytes;
5use fluvio_compression::CompressionError;
6use fluvio_types::PartitionId;
7use tracing::trace;
8
9use fluvio_compression::Compression;
10use fluvio_types::Timestamp;
11
12use crate::core::bytes::Buf;
13use crate::core::bytes::BufMut;
14
15use crate::core::Decoder;
16use crate::core::Encoder;
17use crate::core::Version;
18
19use crate::Offset;
20use crate::Size;
21use crate::record::ConsumerRecord;
22use crate::record::Record;
23
24pub const COMPRESSION_CODEC_MASK: i16 = 0x07;
25pub const NO_TIMESTAMP: i64 = -1;
26
27pub trait BatchRecords: Default + Debug + Encoder + Decoder + Send + Sync {
28    /// how many bytes does record wants to process
29    #[deprecated]
30    fn remainder_bytes(&self, remainder: usize) -> usize {
31        remainder
32    }
33}
34
35/// A type describing in-memory records
36pub type MemoryRecords = Vec<Record>;
37
38/// A type describing Raw records
39/// This structs decodes and encode its bytes as it is. Just the raw bytes of its internal vector.
40/// When decoding, please be sure that your src buffer have the exact number of bytes.
41#[derive(Debug, Default, Clone)]
42pub struct RawRecords(pub Bytes);
43
44impl Encoder for RawRecords {
45    fn write_size(&self, _version: Version) -> usize {
46        self.0.len()
47    }
48
49    fn encode<T: BufMut>(&self, buf: &mut T, _version: Version) -> Result<(), Error> {
50        buf.put_slice(&self.0);
51        Ok(())
52    }
53}
54
55impl Decoder for RawRecords {
56    fn decode<T: Buf>(&mut self, buf: &mut T, _version: Version) -> Result<(), Error> {
57        let len = buf.remaining();
58        self.0 = buf.copy_to_bytes(len);
59        Ok(())
60    }
61}
62impl BatchRecords for MemoryRecords {}
63
64impl BatchRecords for RawRecords {}
65
66/// size of the offset and length
67pub const BATCH_PREAMBLE_SIZE: usize = size_of::<Offset>()     // Offset
68        + size_of::<i32>(); // i32
69
70pub const BATCH_FILE_HEADER_SIZE: usize = BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE;
71
72#[derive(Default, Debug)]
73pub struct Batch<R = MemoryRecords> {
74    pub base_offset: Offset,
75    pub batch_len: i32, // only for decoding
76    pub header: BatchHeader,
77    records: R,
78}
79
80impl<R> Batch<R> {
81    pub fn get_mut_header(&mut self) -> &mut BatchHeader {
82        &mut self.header
83    }
84
85    pub fn get_header(&self) -> &BatchHeader {
86        &self.header
87    }
88
89    #[inline(always)]
90    pub fn own_records(self) -> R {
91        self.records
92    }
93
94    #[inline(always)]
95    pub fn records(&self) -> &R {
96        &self.records
97    }
98
99    #[inline(always)]
100    pub fn mut_records(&mut self) -> &mut R {
101        &mut self.records
102    }
103
104    pub fn get_base_offset(&self) -> Offset {
105        self.base_offset
106    }
107
108    pub fn set_base_offset(&mut self, offset: Offset) {
109        self.base_offset = offset;
110    }
111
112    pub fn base_offset(mut self, offset: Offset) -> Self {
113        self.base_offset = offset;
114        self
115    }
116
117    pub fn add_to_offset_delta(&mut self, delta: i32) {
118        self.header.last_offset_delta += delta;
119    }
120
121    pub fn set_offset_delta(&mut self, delta: i32) {
122        self.header.last_offset_delta = delta;
123    }
124
125    pub fn get_last_offset(&self) -> Offset {
126        self.get_base_offset() + self.last_offset_delta() as Offset
127    }
128
129    pub fn records_len(&self) -> usize {
130        self.last_offset_delta() as usize + 1
131    }
132    /// get last offset delta
133    #[deprecated(since = "0.9.2", note = "use last_offset_delta instead")]
134    pub fn get_last_offset_delta(&self) -> Size {
135        self.get_header().last_offset_delta as Size
136    }
137
138    pub fn last_offset_delta(&self) -> i32 {
139        self.get_header().last_offset_delta
140    }
141
142    pub fn get_compression(&self) -> Result<Compression, CompressionError> {
143        self.get_header().get_compression()
144    }
145
146    /// decode from buf stored in the file
147    /// read all excluding records
148    pub fn decode_from_file_buf<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
149    where
150        T: Buf,
151    {
152        trace!("decoding preamble");
153        self.base_offset.decode(src, version)?;
154        self.batch_len.decode(src, version)?;
155        self.header.decode(src, version)?;
156        Ok(())
157    }
158
159    /// Return the size of the batch header + records
160    pub fn batch_len(&self) -> i32 {
161        self.batch_len
162    }
163}
164
165impl TryFrom<Batch<RawRecords>> for Batch {
166    type Error = CompressionError;
167    fn try_from(batch: Batch<RawRecords>) -> Result<Self, Self::Error> {
168        let records = batch.memory_records()?;
169        Ok(Batch {
170            base_offset: batch.base_offset,
171            batch_len: (BATCH_HEADER_SIZE + records.write_size(0)) as i32,
172            header: batch.header,
173            records,
174        })
175    }
176}
177
178impl TryFrom<Batch> for Batch<RawRecords> {
179    type Error = CompressionError;
180    fn try_from(f: Batch) -> Result<Self, Self::Error> {
181        let mut buf = Vec::new();
182        f.records.encode(&mut buf, 0)?;
183
184        let compression = f.get_compression()?;
185        let compressed_records = compression.compress(&buf)?;
186        let compressed_records_len = compressed_records.len() as i32;
187        let records = RawRecords(compressed_records);
188
189        Ok(Batch {
190            base_offset: f.base_offset,
191            batch_len: compressed_records_len,
192            header: f.header,
193            records,
194        })
195    }
196}
197
198impl<R> Batch<R>
199where
200    R: Encoder,
201{
202    /// check if batch is valid after decoded
203    pub fn validate_decoding(&self) -> bool {
204        self.batch_len == (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32
205    }
206}
207
208impl<R: BatchRecords> Batch<R> {
209    /// Create a new empty batch
210    pub fn new() -> Self {
211        Self::default()
212    }
213
214    /// computed last offset which is base offset + number of records
215    pub fn computed_last_offset(&self) -> Offset {
216        self.get_base_offset() + self.records_len() as Offset
217    }
218}
219
220impl Batch {
221    /// add new record, this will update the offset to correct
222    pub fn add_record(&mut self, record: Record) {
223        self.add_records(&mut vec![record]);
224        self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
225    }
226
227    pub fn add_records(&mut self, records: &mut Vec<Record>) {
228        self.records.append(records);
229        self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
230        self.update_offset_deltas();
231    }
232
233    pub fn update_offset_deltas(&mut self) {
234        for (index, record) in self.records.iter_mut().enumerate() {
235            record.preamble.set_offset_delta(index as Offset);
236        }
237        self.header.last_offset_delta = self.records().len() as i32 - 1;
238    }
239
240    pub fn into_consumer_records_iter(
241        self,
242        partition: PartitionId,
243    ) -> impl Iterator<Item = ConsumerRecord> {
244        let base_offset = self.base_offset;
245        let first_timestamp = self.header.first_timestamp;
246
247        self.records
248            .into_iter()
249            .enumerate()
250            .map(move |(relative, record)| ConsumerRecord {
251                partition,
252                offset: base_offset + relative as Offset,
253                timestamp_base: first_timestamp,
254                record,
255            })
256    }
257}
258impl Batch<RawRecords> {
259    pub fn memory_records(&self) -> Result<MemoryRecords, CompressionError> {
260        let compression = self.get_compression()?;
261
262        let mut records: MemoryRecords = Default::default();
263        if let Compression::None = compression {
264            records.decode(&mut &self.records.0[..], 0)?;
265        } else {
266            let decompressed = compression
267                .uncompress(&self.records.0[..])?
268                .ok_or(CompressionError::UnreachableError)?;
269            records.decode(&mut &decompressed[..], 0)?;
270        }
271        Ok(records)
272    }
273}
274
275impl<T: Into<MemoryRecords>> From<T> for Batch {
276    fn from(records: T) -> Self {
277        let records = records.into();
278        let mut batch = Self::default();
279
280        let records: Vec<_> = records
281            .into_iter()
282            .enumerate()
283            .map(|(i, mut record)| {
284                record.preamble.set_offset_delta(i as Offset);
285                record
286            })
287            .collect();
288
289        batch.records = records;
290        let len = batch.records.len() as i32;
291        batch.batch_len = (BATCH_HEADER_SIZE + batch.records.write_size(0)) as i32;
292        batch.header.last_offset_delta = if len > 0 { len - 1 } else { len };
293        batch
294    }
295}
296
297impl<R> Decoder for Batch<R>
298where
299    R: BatchRecords,
300{
301    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
302    where
303        T: Buf,
304    {
305        trace!("decoding batch");
306        self.decode_from_file_buf(src, version)?;
307
308        let batch_len = self.batch_len as usize - BATCH_HEADER_SIZE;
309        let mut buf = src.take(batch_len);
310        if buf.remaining() < batch_len {
311            return Err(Error::new(
312                std::io::ErrorKind::UnexpectedEof,
313                format!(
314                    "not enough buf records, expected: {}, found: {}",
315                    batch_len,
316                    buf.remaining()
317                ),
318            ));
319        }
320
321        self.records.decode(&mut buf, version)?;
322        Ok(())
323    }
324}
325
326// Record batch contains 12 bytes of pre-amble plus header + records
327impl<R> Encoder for Batch<R>
328where
329    R: BatchRecords,
330{
331    fn write_size(&self, version: Version) -> usize {
332        BATCH_FILE_HEADER_SIZE + self.records.write_size(version)
333    }
334
335    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
336    where
337        T: BufMut,
338    {
339        trace!("Encoding Batch");
340        self.base_offset.encode(dest, version)?;
341        let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
342        batch_len.encode(dest, version)?;
343
344        // encode parts of header
345        self.header.partition_leader_epoch.encode(dest, version)?;
346        self.header.magic.encode(dest, version)?;
347
348        let mut out: Vec<u8> = Vec::new();
349        let buf = &mut out;
350        self.header.attributes.encode(buf, version)?;
351        self.header.last_offset_delta.encode(buf, version)?;
352        self.header.first_timestamp.encode(buf, version)?;
353        self.header.max_time_stamp.encode(buf, version)?;
354        self.header.producer_id.encode(buf, version)?;
355        self.header.producer_epoch.encode(buf, version)?;
356        self.header.first_sequence.encode(buf, version)?;
357        self.records.encode(buf, version)?;
358
359        let crc = crc32c::crc32c(&out);
360        crc.encode(dest, version)?;
361        dest.put_slice(&out);
362        Ok(())
363    }
364}
365
366impl<R: Clone> Clone for Batch<R> {
367    fn clone(&self) -> Self {
368        Self {
369            base_offset: self.base_offset,
370            batch_len: self.batch_len,
371            header: self.header.clone(),
372            records: self.records.clone(),
373        }
374    }
375}
376
377#[derive(Debug, Decoder, Encoder, Clone)]
378pub struct BatchHeader {
379    pub partition_leader_epoch: i32,
380    pub magic: i8,
381    pub crc: u32,
382    pub attributes: i16,
383    /// Indicates the count from the beginning of the batch to the end
384    ///
385    /// Adding this to the base_offset will give the offset of the last record in this batch
386    pub last_offset_delta: i32,
387    pub first_timestamp: Timestamp,
388    pub max_time_stamp: Timestamp,
389    pub producer_id: i64,
390    pub producer_epoch: i16,
391    pub first_sequence: i32,
392}
393
394impl BatchHeader {
395    fn get_compression(&self) -> Result<Compression, CompressionError> {
396        let compression_bits = self.attributes & COMPRESSION_CODEC_MASK;
397        Compression::try_from(compression_bits as i8)
398    }
399
400    pub fn set_compression(&mut self, compression: Compression) {
401        let compression_bits = compression as i16 & COMPRESSION_CODEC_MASK;
402        self.attributes = (self.attributes & !COMPRESSION_CODEC_MASK) | compression_bits;
403    }
404
405    #[cfg(feature = "memory_batch")]
406    fn set_first_timestamp(&mut self, timestamp: Timestamp) {
407        self.first_timestamp = timestamp;
408    }
409
410    #[cfg(feature = "memory_batch")]
411    fn set_max_time_stamp(&mut self, timestamp: Timestamp) {
412        self.max_time_stamp = timestamp;
413    }
414}
415impl Default for BatchHeader {
416    fn default() -> Self {
417        BatchHeader {
418            partition_leader_epoch: -1,
419            magic: 2,
420            crc: 0,
421            attributes: 0,
422            last_offset_delta: -1,
423            first_timestamp: NO_TIMESTAMP,
424            max_time_stamp: NO_TIMESTAMP,
425            producer_id: -1,
426            producer_epoch: -1,
427            first_sequence: -1,
428        }
429    }
430}
431
432pub const BATCH_HEADER_SIZE: usize = size_of::<i32>()     // partition leader epoch
433        + size_of::<u8>()       // magic
434        + size_of::<i32>()      //crc
435        + size_of::<i16>()      // i16
436        + size_of::<i32>()      // last offset delta
437        + size_of::<i64>()      // first_timestamp
438        + size_of::<i64>()      // max_time_stamp
439        + size_of::<i64>()      //producer id
440        + size_of::<i16>()      // produce_epoch
441        + size_of::<i32>(); // first sequence
442
443#[cfg(feature = "memory_batch")]
444pub mod memory {
445    use super::*;
446    use chrono::Utc;
447    pub struct MemoryBatch {
448        compression: Compression,
449        write_limit: usize,
450        current_size_uncompressed: usize,
451        is_full: bool,
452        create_time: Timestamp,
453        records: Vec<Record>,
454    }
455    impl MemoryBatch {
456        pub fn new(write_limit: usize, compression: Compression) -> Self {
457            let now = Utc::now().timestamp_millis();
458            Self {
459                compression,
460                is_full: false,
461                write_limit,
462                create_time: now,
463                current_size_uncompressed: Vec::<RawRecords>::default().write_size(0),
464                records: vec![],
465            }
466        }
467
468        pub(crate) fn compression(&self) -> Compression {
469            self.compression
470        }
471
472        /// Add a record to the batch.
473        /// The value of `Offset` is relative to the `MemoryBatch` instance.
474        pub fn push_record(&mut self, mut record: Record) -> Option<Offset> {
475            let current_offset = self.offset() as i64;
476            record.preamble.set_offset_delta(current_offset as Offset);
477
478            let timestamp_delta = self.elapsed();
479            record.preamble.set_timestamp_delta(timestamp_delta);
480
481            let record_size = record.write_size(0);
482
483            if self.estimated_size() + record_size > self.write_limit {
484                self.is_full = true;
485                return None;
486            }
487
488            if self.estimated_size() + record_size == self.write_limit {
489                self.is_full = true;
490            }
491
492            self.current_size_uncompressed += record_size;
493
494            self.records.push(record);
495
496            Some(current_offset)
497        }
498
499        pub fn is_full(&self) -> bool {
500            self.is_full || self.write_limit <= self.estimated_size()
501        }
502
503        pub fn elapsed(&self) -> Timestamp {
504            let now = Utc::now().timestamp_millis();
505
506            std::cmp::max(0, now - self.create_time)
507        }
508
509        fn estimated_size(&self) -> usize {
510            (self.current_size_uncompressed as f32
511                * match self.compression {
512                    Compression::None => 1.0,
513                    Compression::Gzip | Compression::Snappy | Compression::Lz4 => 0.5,
514                }) as usize
515                + Batch::<RawRecords>::default().write_size(0)
516        }
517
518        pub fn records_len(&self) -> usize {
519            self.records.len()
520        }
521
522        #[inline]
523        pub fn offset(&self) -> usize {
524            self.records_len()
525        }
526
527        pub fn current_size_uncompressed(&self) -> usize {
528            self.current_size_uncompressed
529        }
530    }
531
532    impl From<MemoryBatch> for Batch<MemoryRecords> {
533        fn from(p_batch: MemoryBatch) -> Self {
534            let mut batch = Self {
535                batch_len: (BATCH_HEADER_SIZE + p_batch.records.write_size(0)) as i32,
536                ..Default::default()
537            };
538            let compression = p_batch.compression();
539            let records = p_batch.records;
540
541            let len = records.len() as i32;
542            batch.set_base_offset(if len > 0 { len - 1 } else { len } as i64);
543
544            let header = batch.get_mut_header();
545            header.last_offset_delta = if len > 0 { len - 1 } else { len };
546
547            let first_timestamp = p_batch.create_time;
548
549            let max_time_stamp = records
550                .last()
551                .map(|r| first_timestamp + r.timestamp_delta())
552                .unwrap_or(0);
553
554            header.set_first_timestamp(first_timestamp);
555            header.set_max_time_stamp(max_time_stamp);
556
557            header.set_compression(compression);
558
559            *batch.mut_records() = records;
560
561            batch
562        }
563    }
564}
565
566#[cfg(test)]
567mod test {
568
569    use super::*;
570    use std::io::Cursor;
571    use std::io::Error as IoError;
572
573    use crate::core::Decoder;
574    use crate::core::Encoder;
575    use crate::record::{Record, RecordData};
576    use crate::batch::Batch;
577    use super::BatchHeader;
578    use super::BATCH_HEADER_SIZE;
579
580    #[cfg(feature = "memory_batch")]
581    use super::memory::MemoryBatch;
582
583    #[test]
584    fn test_batch_convert_compression_size() {}
585
586    #[test]
587    fn test_batch_size() {
588        let header = BatchHeader::default();
589        assert_eq!(header.write_size(0), BATCH_HEADER_SIZE);
590    }
591
592    #[test]
593    fn test_encode_and_decode_batch() -> Result<(), IoError> {
594        let value = vec![0x74, 0x65, 0x73, 0x74];
595        let record = Record {
596            value: RecordData::from(value),
597            ..Default::default()
598        };
599        let mut batch = Batch::<MemoryRecords>::default();
600        batch.records.push(record);
601        batch.header.first_timestamp = 1555478494747;
602        batch.header.max_time_stamp = 1555478494747;
603
604        let bytes = batch.as_bytes(0)?;
605        println!("batch raw bytes: {:#X?}", bytes.as_ref());
606
607        let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
608        println!("batch: {:#?}", batch);
609
610        let decoded_record = batch.records.get(0).unwrap();
611        println!("record crc: {}", batch.header.crc);
612        assert_eq!(batch.header.crc, 1430948200);
613        let b = decoded_record.value.as_ref();
614        assert_eq!(b, b"test");
615        assert!(batch.validate_decoding());
616
617        Ok(())
618    }
619
620    /*  raw batch encoded
621
622    0000   02 00 00 00 45 00 00 c7 00 00 40 00 40 06 00 00
623    0010   c0 a8 07 30 c0 a8 07 30 d1 b9 23 84 29 ba 3d 48
624    0020   0b 13 89 98 80 18 97 62 90 6a 00 00 01 01 08 0a
625    0030   1e 6f 09 0d 1e 6f 09 06 00 00 00 8f 00 00 00 05
626    0040   00 00 00 03 00 10 63 6f 6e 73 6f 6c 65 2d 70 72
627    0050   6f 64 75 63 65 72 ff ff 00 01 00 00 05 dc 00 00
628    0060   00 01 00 13 6d 79 2d 72 65 70 6c 69 63 61 74 65
629    0070   64 2d 74 6f 70 69 63 00 00 00 01 00 00 00 00 00
630    0080   00 00 48 00 00 00 00 00 00 00 00 00 00 00 3c ff
631    0090   ff ff ff 02 5a 44 2c 31 00 00 00 00 00 00 00 00
632    00a0   01 6a 29 be 3e 1b 00 00 01 6a 29 be 3e 1b ff ff
633    00b0   ff ff ff ff ff ff ff ff ff ff ff ff 00 00 00 01
634    00c0   14 00 00 00 01 08 74 65 73 74 00
635    */
636
637    #[test]
638    fn test_batch_offset_delta() {
639        let mut batch = Batch::<MemoryRecords>::default();
640        assert_eq!(batch.get_base_offset(), 0);
641
642        assert_eq!(batch.last_offset_delta(), -1);
643        // last offset is -1 because there are no records in the batch
644        assert_eq!(batch.get_last_offset(), -1);
645
646        batch.add_record(Record::default());
647        assert_eq!(batch.last_offset_delta(), 0);
648        assert_eq!(batch.get_last_offset(), 0);
649
650        batch.add_record(Record::default());
651        assert_eq!(batch.last_offset_delta(), 1);
652        assert_eq!(batch.get_last_offset(), 1);
653
654        batch.add_record(Record::default());
655        assert_eq!(batch.last_offset_delta(), 2);
656        assert_eq!(batch.get_last_offset(), 2);
657
658        assert_eq!(
659            batch
660                .records
661                .get(0)
662                .expect("index 0 should exists")
663                .get_offset_delta(),
664            0
665        );
666        assert_eq!(
667            batch
668                .records
669                .get(1)
670                .expect("index 1 should exists")
671                .get_offset_delta(),
672            1
673        );
674        assert_eq!(
675            batch
676                .records
677                .get(2)
678                .expect("index 2 should exists")
679                .get_offset_delta(),
680            2
681        );
682    }
683
684    #[test]
685    fn test_batch_offset_diff_base() {
686        let mut batch = Batch::<MemoryRecords>::default();
687        batch.set_base_offset(1000);
688        assert_eq!(batch.get_base_offset(), 1000);
689
690        assert_eq!(batch.last_offset_delta(), -1);
691        // last offset is -1 because there are no records in the batch
692        assert_eq!(batch.get_last_offset(), 999);
693
694        batch.add_record(Record::default());
695        assert_eq!(batch.last_offset_delta(), 0);
696        assert_eq!(batch.get_last_offset(), 1000);
697
698        batch.add_record(Record::default());
699        assert_eq!(batch.last_offset_delta(), 1);
700        assert_eq!(batch.get_last_offset(), 1001);
701
702        batch.add_record(Record::default());
703        assert_eq!(batch.last_offset_delta(), 2);
704        assert_eq!(batch.get_last_offset(), 1002);
705
706        assert_eq!(
707            batch
708                .records
709                .get(0)
710                .expect("index 0 should exists")
711                .get_offset_delta(),
712            0
713        );
714        assert_eq!(
715            batch
716                .records
717                .get(1)
718                .expect("index 1 should exists")
719                .get_offset_delta(),
720            1
721        );
722        assert_eq!(
723            batch
724                .records
725                .get(2)
726                .expect("index 2 should exists")
727                .get_offset_delta(),
728            2
729        );
730    }
731
732    #[test]
733    fn test_records_offset_delta() {
734        let mut batch = Batch::<MemoryRecords>::default();
735        batch.set_base_offset(2000);
736        assert_eq!(batch.get_base_offset(), 2000);
737
738        // add records directly
739        batch.records.append(&mut vec![
740            Record::default(),
741            Record::default(),
742            Record::default(),
743        ]);
744        batch.update_offset_deltas();
745        assert_eq!(batch.last_offset_delta(), 2);
746        assert_eq!(batch.get_last_offset(), 2002);
747
748        assert_eq!(
749            batch
750                .records
751                .get(0)
752                .expect("index 0 should exists")
753                .get_offset_delta(),
754            0
755        );
756        assert_eq!(
757            batch
758                .records
759                .get(1)
760                .expect("index 1 should exists")
761                .get_offset_delta(),
762            1
763        );
764        assert_eq!(
765            batch
766                .records
767                .get(2)
768                .expect("index 2 should exists")
769                .get_offset_delta(),
770            2
771        );
772    }
773
774    #[test]
775    fn test_batch_records_offset() {
776        let mut comparison = Batch::<MemoryRecords>::default();
777        comparison.add_record(Record::default());
778        comparison.add_record(Record::default());
779        comparison.add_record(Record::default());
780
781        let batch_created = Batch::from(vec![
782            Record::default(),
783            Record::default(),
784            Record::default(),
785        ]);
786
787        for i in 0..3 {
788            assert_eq!(
789                batch_created
790                    .records
791                    .get(i)
792                    .expect("get record")
793                    .get_offset_delta(),
794                comparison
795                    .records
796                    .get(i)
797                    .expect("get record")
798                    .get_offset_delta(),
799                "Creating a Batch from a Vec gave wrong delta",
800            )
801        }
802
803        assert_eq!(batch_created.last_offset_delta(), 2);
804    }
805
806    #[test]
807    fn test_into_consumer_records_iter() {
808        let mut batch = Batch::from(vec![
809            Record::default(),
810            Record::default(),
811            Record::default(),
812        ]);
813
814        batch.header.first_timestamp = 1_500_000_000;
815        let partition_id = 1;
816
817        let consumer_records = batch
818            .into_consumer_records_iter(partition_id)
819            .collect::<Vec<ConsumerRecord>>();
820        assert_eq!(consumer_records.len(), 3);
821        assert_eq!(consumer_records[0].offset(), 0);
822        assert_eq!(consumer_records[1].offset(), 1);
823        assert_eq!(consumer_records[2].offset(), 2);
824
825        consumer_records.iter().for_each(|record| {
826            assert_eq!(record.timestamp(), 1_500_000_000);
827            assert_eq!(record.partition, partition_id);
828        });
829    }
830
831    #[cfg(feature = "memory_batch")]
832    #[test]
833    fn test_memory_batch() {
834        use super::memory::MemoryBatch;
835
836        let record = Record::from(("key", "value"));
837        let size = record.write_size(0);
838
839        let mut mb = MemoryBatch::new(
840            size * 4
841                + Batch::<RawRecords>::default().write_size(0)
842                + Vec::<RawRecords>::default().write_size(0),
843            Compression::None,
844        );
845
846        assert!(mb.push_record(record).is_some());
847        std::thread::sleep(std::time::Duration::from_millis(100));
848        let record = Record::from(("key", "value"));
849        assert!(mb.push_record(record).is_some());
850        std::thread::sleep(std::time::Duration::from_millis(100));
851        let record = Record::from(("key", "value"));
852        assert!(mb.push_record(record).is_some());
853
854        let batch: Batch<MemoryRecords> = mb.try_into().expect("failed to convert");
855        assert!(
856            batch.header.first_timestamp > 0,
857            "first_timestamp is {}",
858            batch.header.first_timestamp
859        );
860        assert!(
861            batch.header.first_timestamp < batch.header.max_time_stamp,
862            "first_timestamp: {}, max_time_stamp: {}",
863            batch.header.first_timestamp,
864            batch.header.max_time_stamp
865        );
866
867        let records_delta: Vec<_> = batch
868            .records()
869            .iter()
870            .map(|record| record.timestamp_delta())
871            .collect();
872        assert_eq!(records_delta[0], 0);
873        assert!(
874            (100..150).contains(&records_delta[1]),
875            "records_delta[1]: {}",
876            records_delta[1]
877        );
878        assert!(
879            (200..250).contains(&records_delta[2]),
880            "records_delta[2]: {}",
881            records_delta[2]
882        );
883    }
884
885    #[test]
886    fn test_batch_len() {
887        let mem_records = vec![Record::default(), Record::default(), Record::default()];
888
889        // Verify batch len is instantiated
890        let batch = Batch::from(mem_records.clone());
891
892        assert_eq!(
893            batch.batch_len(),
894            (BATCH_HEADER_SIZE + mem_records.write_size(0)) as i32
895        );
896
897        // Verify batch len is preserved during conversion
898        let batch_raw_records: Batch<RawRecords> = Batch::try_from(batch).unwrap();
899        assert_eq!(
900            batch_raw_records.batch_len(),
901            mem_records.write_size(0) as i32
902        );
903
904        // Verify batch len is preserved during conversion
905        let batch: Batch = batch_raw_records.try_into().unwrap();
906        assert_eq!(
907            batch.batch_len(),
908            (BATCH_HEADER_SIZE + mem_records.write_size(0)) as i32
909        );
910
911        // Verify increase in batch len when we add records
912        let mut batch_mem_records: Batch<MemoryRecords> = Batch::new();
913        batch_mem_records.add_records(&mut mem_records.clone());
914        assert_eq!(
915            batch_mem_records.batch_len(),
916            (BATCH_HEADER_SIZE + mem_records.write_size(0)) as i32
917        );
918
919        batch_mem_records.add_record(Record::default());
920        assert_eq!(
921            batch_mem_records.batch_len(),
922            (BATCH_HEADER_SIZE + batch_mem_records.records.write_size(0)) as i32
923        );
924
925        // Test compressed batch is smaller than non compressed when converted to Batch<RawRecords>
926        // using record with easy to compress data
927        let test_record = Record::new("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
928        let mut test_records = vec![test_record.clone(), test_record.clone(), test_record];
929
930        let mut batch_mem_1: Batch = Batch::new();
931        batch_mem_1.add_records(&mut test_records.clone());
932
933        let mut batch_mem_2: Batch = Batch::new();
934        batch_mem_2.add_records(&mut test_records);
935
936        // Verify we're starting from the same length
937        assert_eq!(batch_mem_1.batch_len(), batch_mem_2.batch_len());
938
939        // Apply compression and compare resulting sizes
940        let no_compression = Compression::None;
941        let compression = Compression::Gzip;
942
943        let header_1 = batch_mem_1.get_mut_header();
944        let header_2 = batch_mem_2.get_mut_header();
945
946        header_1.set_compression(no_compression);
947        header_2.set_compression(compression);
948
949        let not_compressed: Batch<RawRecords> = Batch::try_from(batch_mem_1).unwrap();
950        let compressed: Batch<RawRecords> = Batch::try_from(batch_mem_2).unwrap();
951
952        assert_ne!(not_compressed.batch_len(), compressed.batch_len());
953        assert!(not_compressed.batch_len() > compressed.batch_len());
954    }
955
956    #[cfg(feature = "memory_batch")]
957    #[test]
958    fn test_convert_memory_batch_to_batch() {
959        let num_records = 10;
960
961        let record_data = "I am test input".to_string().into_bytes();
962        let memory_batch_compression = Compression::Gzip;
963
964        // This MemoryBatch write limit is minimal value to pass test
965        let mut memory_batch = MemoryBatch::new(180, memory_batch_compression);
966
967        let mut offset = 0;
968
969        for _ in 0..num_records {
970            offset = memory_batch
971                .push_record(Record {
972                    value: RecordData::from(record_data.clone()),
973                    ..Default::default()
974                })
975                .expect("Offset should exist");
976        }
977
978        let memory_batch_records_len = memory_batch.records_len();
979        let memory_batch_size_uncompressed = memory_batch.current_size_uncompressed();
980
981        let batch: Batch<MemoryRecords> = memory_batch.into();
982
983        assert_eq!(
984            batch.get_base_offset(),
985            (memory_batch_records_len - 1) as i64
986        );
987
988        assert_eq!(batch.last_offset_delta(), offset as i32);
989        assert_eq!(batch.get_base_offset() as i32, batch.last_offset_delta());
990
991        assert_eq!(
992            batch.get_compression().expect("Compression should exist"),
993            memory_batch_compression
994        );
995
996        assert_eq!(batch.records_len(), memory_batch_records_len);
997
998        assert_eq!(
999            batch.batch_len(),
1000            (BATCH_HEADER_SIZE + memory_batch_size_uncompressed) as i32
1001        );
1002    }
1003}