1use std::io::Error;
2use std::mem::size_of;
3use std::fmt::Debug;
4use bytes::Bytes;
5use fluvio_compression::CompressionError;
6use fluvio_types::PartitionId;
7use tracing::trace;
8
9use fluvio_compression::Compression;
10use fluvio_types::Timestamp;
11
12use crate::core::bytes::Buf;
13use crate::core::bytes::BufMut;
14
15use crate::core::Decoder;
16use crate::core::Encoder;
17use crate::core::Version;
18
19use crate::Offset;
20use crate::Size;
21use crate::record::ConsumerRecord;
22use crate::record::Record;
23
24pub const COMPRESSION_CODEC_MASK: i16 = 0x07;
25pub const NO_TIMESTAMP: i64 = -1;
26
27pub trait BatchRecords: Default + Debug + Encoder + Decoder + Send + Sync {
28 #[deprecated]
30 fn remainder_bytes(&self, remainder: usize) -> usize {
31 remainder
32 }
33}
34
35pub type MemoryRecords = Vec<Record>;
37
38#[derive(Debug, Default, Clone)]
42pub struct RawRecords(pub Bytes);
43
44impl Encoder for RawRecords {
45 fn write_size(&self, _version: Version) -> usize {
46 self.0.len()
47 }
48
49 fn encode<T: BufMut>(&self, buf: &mut T, _version: Version) -> Result<(), Error> {
50 buf.put_slice(&self.0);
51 Ok(())
52 }
53}
54
55impl Decoder for RawRecords {
56 fn decode<T: Buf>(&mut self, buf: &mut T, _version: Version) -> Result<(), Error> {
57 let len = buf.remaining();
58 self.0 = buf.copy_to_bytes(len);
59 Ok(())
60 }
61}
62impl BatchRecords for MemoryRecords {}
63
64impl BatchRecords for RawRecords {}
65
66pub const BATCH_PREAMBLE_SIZE: usize = size_of::<Offset>() + size_of::<i32>(); pub const BATCH_FILE_HEADER_SIZE: usize = BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE;
71
72#[derive(Default, Debug)]
73pub struct Batch<R = MemoryRecords> {
74 pub base_offset: Offset,
75 pub batch_len: i32, pub header: BatchHeader,
77 records: R,
78}
79
80impl<R> Batch<R> {
81 pub fn get_mut_header(&mut self) -> &mut BatchHeader {
82 &mut self.header
83 }
84
85 pub fn get_header(&self) -> &BatchHeader {
86 &self.header
87 }
88
89 #[inline(always)]
90 pub fn own_records(self) -> R {
91 self.records
92 }
93
94 #[inline(always)]
95 pub fn records(&self) -> &R {
96 &self.records
97 }
98
99 #[inline(always)]
100 pub fn mut_records(&mut self) -> &mut R {
101 &mut self.records
102 }
103
104 pub fn get_base_offset(&self) -> Offset {
105 self.base_offset
106 }
107
108 pub fn set_base_offset(&mut self, offset: Offset) {
109 self.base_offset = offset;
110 }
111
112 pub fn base_offset(mut self, offset: Offset) -> Self {
113 self.base_offset = offset;
114 self
115 }
116
117 pub fn add_to_offset_delta(&mut self, delta: i32) {
118 self.header.last_offset_delta += delta;
119 }
120
121 pub fn set_offset_delta(&mut self, delta: i32) {
122 self.header.last_offset_delta = delta;
123 }
124
125 pub fn get_last_offset(&self) -> Offset {
126 self.get_base_offset() + self.last_offset_delta() as Offset
127 }
128
129 pub fn records_len(&self) -> usize {
130 self.last_offset_delta() as usize + 1
131 }
132 #[deprecated(since = "0.9.2", note = "use last_offset_delta instead")]
134 pub fn get_last_offset_delta(&self) -> Size {
135 self.get_header().last_offset_delta as Size
136 }
137
138 pub fn last_offset_delta(&self) -> i32 {
139 self.get_header().last_offset_delta
140 }
141
142 pub fn get_compression(&self) -> Result<Compression, CompressionError> {
143 self.get_header().get_compression()
144 }
145
146 pub fn decode_from_file_buf<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
149 where
150 T: Buf,
151 {
152 trace!("decoding preamble");
153 self.base_offset.decode(src, version)?;
154 self.batch_len.decode(src, version)?;
155 self.header.decode(src, version)?;
156 Ok(())
157 }
158
159 pub fn batch_len(&self) -> i32 {
161 self.batch_len
162 }
163}
164
165impl TryFrom<Batch<RawRecords>> for Batch {
166 type Error = CompressionError;
167 fn try_from(batch: Batch<RawRecords>) -> Result<Self, Self::Error> {
168 let records = batch.memory_records()?;
169 Ok(Batch {
170 base_offset: batch.base_offset,
171 batch_len: (BATCH_HEADER_SIZE + records.write_size(0)) as i32,
172 header: batch.header,
173 records,
174 })
175 }
176}
177
178impl TryFrom<Batch> for Batch<RawRecords> {
179 type Error = CompressionError;
180 fn try_from(f: Batch) -> Result<Self, Self::Error> {
181 let mut buf = Vec::new();
182 f.records.encode(&mut buf, 0)?;
183
184 let compression = f.get_compression()?;
185 let compressed_records = compression.compress(&buf)?;
186 let compressed_records_len = compressed_records.len() as i32;
187 let records = RawRecords(compressed_records);
188
189 Ok(Batch {
190 base_offset: f.base_offset,
191 batch_len: compressed_records_len,
192 header: f.header,
193 records,
194 })
195 }
196}
197
198impl<R> Batch<R>
199where
200 R: Encoder,
201{
202 pub fn validate_decoding(&self) -> bool {
204 self.batch_len == (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32
205 }
206}
207
208impl<R: BatchRecords> Batch<R> {
209 pub fn new() -> Self {
211 Self::default()
212 }
213
214 pub fn computed_last_offset(&self) -> Offset {
216 self.get_base_offset() + self.records_len() as Offset
217 }
218}
219
220impl Batch {
221 pub fn add_record(&mut self, record: Record) {
223 self.add_records(&mut vec![record]);
224 self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
225 }
226
227 pub fn add_records(&mut self, records: &mut Vec<Record>) {
228 self.records.append(records);
229 self.batch_len = (BATCH_HEADER_SIZE + self.records.write_size(0)) as i32;
230 self.update_offset_deltas();
231 }
232
233 pub fn update_offset_deltas(&mut self) {
234 for (index, record) in self.records.iter_mut().enumerate() {
235 record.preamble.set_offset_delta(index as Offset);
236 }
237 self.header.last_offset_delta = self.records().len() as i32 - 1;
238 }
239
240 pub fn into_consumer_records_iter(
241 self,
242 partition: PartitionId,
243 ) -> impl Iterator<Item = ConsumerRecord> {
244 let base_offset = self.base_offset;
245 let first_timestamp = self.header.first_timestamp;
246
247 self.records
248 .into_iter()
249 .enumerate()
250 .map(move |(relative, record)| ConsumerRecord {
251 partition,
252 offset: base_offset + relative as Offset,
253 timestamp_base: first_timestamp,
254 record,
255 })
256 }
257}
258impl Batch<RawRecords> {
259 pub fn memory_records(&self) -> Result<MemoryRecords, CompressionError> {
260 let compression = self.get_compression()?;
261
262 let mut records: MemoryRecords = Default::default();
263 if let Compression::None = compression {
264 records.decode(&mut &self.records.0[..], 0)?;
265 } else {
266 let decompressed = compression
267 .uncompress(&self.records.0[..])?
268 .ok_or(CompressionError::UnreachableError)?;
269 records.decode(&mut &decompressed[..], 0)?;
270 }
271 Ok(records)
272 }
273}
274
275impl<T: Into<MemoryRecords>> From<T> for Batch {
276 fn from(records: T) -> Self {
277 let records = records.into();
278 let mut batch = Self::default();
279
280 let records: Vec<_> = records
281 .into_iter()
282 .enumerate()
283 .map(|(i, mut record)| {
284 record.preamble.set_offset_delta(i as Offset);
285 record
286 })
287 .collect();
288
289 batch.records = records;
290 let len = batch.records.len() as i32;
291 batch.batch_len = (BATCH_HEADER_SIZE + batch.records.write_size(0)) as i32;
292 batch.header.last_offset_delta = if len > 0 { len - 1 } else { len };
293 batch
294 }
295}
296
297impl<R> Decoder for Batch<R>
298where
299 R: BatchRecords,
300{
301 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
302 where
303 T: Buf,
304 {
305 trace!("decoding batch");
306 self.decode_from_file_buf(src, version)?;
307
308 let batch_len = self.batch_len as usize - BATCH_HEADER_SIZE;
309 let mut buf = src.take(batch_len);
310 if buf.remaining() < batch_len {
311 return Err(Error::new(
312 std::io::ErrorKind::UnexpectedEof,
313 format!(
314 "not enough buf records, expected: {}, found: {}",
315 batch_len,
316 buf.remaining()
317 ),
318 ));
319 }
320
321 self.records.decode(&mut buf, version)?;
322 Ok(())
323 }
324}
325
326impl<R> Encoder for Batch<R>
328where
329 R: BatchRecords,
330{
331 fn write_size(&self, version: Version) -> usize {
332 BATCH_FILE_HEADER_SIZE + self.records.write_size(version)
333 }
334
335 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
336 where
337 T: BufMut,
338 {
339 trace!("Encoding Batch");
340 self.base_offset.encode(dest, version)?;
341 let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
342 batch_len.encode(dest, version)?;
343
344 self.header.partition_leader_epoch.encode(dest, version)?;
346 self.header.magic.encode(dest, version)?;
347
348 let mut out: Vec<u8> = Vec::new();
349 let buf = &mut out;
350 self.header.attributes.encode(buf, version)?;
351 self.header.last_offset_delta.encode(buf, version)?;
352 self.header.first_timestamp.encode(buf, version)?;
353 self.header.max_time_stamp.encode(buf, version)?;
354 self.header.producer_id.encode(buf, version)?;
355 self.header.producer_epoch.encode(buf, version)?;
356 self.header.first_sequence.encode(buf, version)?;
357 self.records.encode(buf, version)?;
358
359 let crc = crc32c::crc32c(&out);
360 crc.encode(dest, version)?;
361 dest.put_slice(&out);
362 Ok(())
363 }
364}
365
366impl<R: Clone> Clone for Batch<R> {
367 fn clone(&self) -> Self {
368 Self {
369 base_offset: self.base_offset,
370 batch_len: self.batch_len,
371 header: self.header.clone(),
372 records: self.records.clone(),
373 }
374 }
375}
376
377#[derive(Debug, Decoder, Encoder, Clone)]
378pub struct BatchHeader {
379 pub partition_leader_epoch: i32,
380 pub magic: i8,
381 pub crc: u32,
382 pub attributes: i16,
383 pub last_offset_delta: i32,
387 pub first_timestamp: Timestamp,
388 pub max_time_stamp: Timestamp,
389 pub producer_id: i64,
390 pub producer_epoch: i16,
391 pub first_sequence: i32,
392}
393
394impl BatchHeader {
395 fn get_compression(&self) -> Result<Compression, CompressionError> {
396 let compression_bits = self.attributes & COMPRESSION_CODEC_MASK;
397 Compression::try_from(compression_bits as i8)
398 }
399
400 pub fn set_compression(&mut self, compression: Compression) {
401 let compression_bits = compression as i16 & COMPRESSION_CODEC_MASK;
402 self.attributes = (self.attributes & !COMPRESSION_CODEC_MASK) | compression_bits;
403 }
404
405 #[cfg(feature = "memory_batch")]
406 fn set_first_timestamp(&mut self, timestamp: Timestamp) {
407 self.first_timestamp = timestamp;
408 }
409
410 #[cfg(feature = "memory_batch")]
411 fn set_max_time_stamp(&mut self, timestamp: Timestamp) {
412 self.max_time_stamp = timestamp;
413 }
414}
415impl Default for BatchHeader {
416 fn default() -> Self {
417 BatchHeader {
418 partition_leader_epoch: -1,
419 magic: 2,
420 crc: 0,
421 attributes: 0,
422 last_offset_delta: -1,
423 first_timestamp: NO_TIMESTAMP,
424 max_time_stamp: NO_TIMESTAMP,
425 producer_id: -1,
426 producer_epoch: -1,
427 first_sequence: -1,
428 }
429 }
430}
431
432pub const BATCH_HEADER_SIZE: usize = size_of::<i32>() + size_of::<u8>() + size_of::<i32>() + size_of::<i16>() + size_of::<i32>() + size_of::<i64>() + size_of::<i64>() + size_of::<i64>() + size_of::<i16>() + size_of::<i32>(); #[cfg(feature = "memory_batch")]
444pub mod memory {
445 use super::*;
446 use chrono::Utc;
447 pub struct MemoryBatch {
448 compression: Compression,
449 write_limit: usize,
450 current_size_uncompressed: usize,
451 is_full: bool,
452 create_time: Timestamp,
453 records: Vec<Record>,
454 }
455 impl MemoryBatch {
456 pub fn new(write_limit: usize, compression: Compression) -> Self {
457 let now = Utc::now().timestamp_millis();
458 Self {
459 compression,
460 is_full: false,
461 write_limit,
462 create_time: now,
463 current_size_uncompressed: Vec::<RawRecords>::default().write_size(0),
464 records: vec![],
465 }
466 }
467
468 pub(crate) fn compression(&self) -> Compression {
469 self.compression
470 }
471
472 pub fn push_record(&mut self, mut record: Record) -> Option<Offset> {
475 let current_offset = self.offset() as i64;
476 record.preamble.set_offset_delta(current_offset as Offset);
477
478 let timestamp_delta = self.elapsed();
479 record.preamble.set_timestamp_delta(timestamp_delta);
480
481 let record_size = record.write_size(0);
482
483 if self.estimated_size() + record_size > self.write_limit {
484 self.is_full = true;
485 return None;
486 }
487
488 if self.estimated_size() + record_size == self.write_limit {
489 self.is_full = true;
490 }
491
492 self.current_size_uncompressed += record_size;
493
494 self.records.push(record);
495
496 Some(current_offset)
497 }
498
499 pub fn is_full(&self) -> bool {
500 self.is_full || self.write_limit <= self.estimated_size()
501 }
502
503 pub fn elapsed(&self) -> Timestamp {
504 let now = Utc::now().timestamp_millis();
505
506 std::cmp::max(0, now - self.create_time)
507 }
508
509 fn estimated_size(&self) -> usize {
510 (self.current_size_uncompressed as f32
511 * match self.compression {
512 Compression::None => 1.0,
513 Compression::Gzip | Compression::Snappy | Compression::Lz4 => 0.5,
514 }) as usize
515 + Batch::<RawRecords>::default().write_size(0)
516 }
517
518 pub fn records_len(&self) -> usize {
519 self.records.len()
520 }
521
522 #[inline]
523 pub fn offset(&self) -> usize {
524 self.records_len()
525 }
526
527 pub fn current_size_uncompressed(&self) -> usize {
528 self.current_size_uncompressed
529 }
530 }
531
532 impl From<MemoryBatch> for Batch<MemoryRecords> {
533 fn from(p_batch: MemoryBatch) -> Self {
534 let mut batch = Self {
535 batch_len: (BATCH_HEADER_SIZE + p_batch.records.write_size(0)) as i32,
536 ..Default::default()
537 };
538 let compression = p_batch.compression();
539 let records = p_batch.records;
540
541 let len = records.len() as i32;
542 batch.set_base_offset(if len > 0 { len - 1 } else { len } as i64);
543
544 let header = batch.get_mut_header();
545 header.last_offset_delta = if len > 0 { len - 1 } else { len };
546
547 let first_timestamp = p_batch.create_time;
548
549 let max_time_stamp = records
550 .last()
551 .map(|r| first_timestamp + r.timestamp_delta())
552 .unwrap_or(0);
553
554 header.set_first_timestamp(first_timestamp);
555 header.set_max_time_stamp(max_time_stamp);
556
557 header.set_compression(compression);
558
559 *batch.mut_records() = records;
560
561 batch
562 }
563 }
564}
565
566#[cfg(test)]
567mod test {
568
569 use super::*;
570 use std::io::Cursor;
571 use std::io::Error as IoError;
572
573 use crate::core::Decoder;
574 use crate::core::Encoder;
575 use crate::record::{Record, RecordData};
576 use crate::batch::Batch;
577 use super::BatchHeader;
578 use super::BATCH_HEADER_SIZE;
579
580 #[cfg(feature = "memory_batch")]
581 use super::memory::MemoryBatch;
582
583 #[test]
584 fn test_batch_convert_compression_size() {}
585
586 #[test]
587 fn test_batch_size() {
588 let header = BatchHeader::default();
589 assert_eq!(header.write_size(0), BATCH_HEADER_SIZE);
590 }
591
592 #[test]
593 fn test_encode_and_decode_batch() -> Result<(), IoError> {
594 let value = vec![0x74, 0x65, 0x73, 0x74];
595 let record = Record {
596 value: RecordData::from(value),
597 ..Default::default()
598 };
599 let mut batch = Batch::<MemoryRecords>::default();
600 batch.records.push(record);
601 batch.header.first_timestamp = 1555478494747;
602 batch.header.max_time_stamp = 1555478494747;
603
604 let bytes = batch.as_bytes(0)?;
605 println!("batch raw bytes: {:#X?}", bytes.as_ref());
606
607 let batch = Batch::<MemoryRecords>::decode_from(&mut Cursor::new(bytes), 0)?;
608 println!("batch: {:#?}", batch);
609
610 let decoded_record = batch.records.get(0).unwrap();
611 println!("record crc: {}", batch.header.crc);
612 assert_eq!(batch.header.crc, 1430948200);
613 let b = decoded_record.value.as_ref();
614 assert_eq!(b, b"test");
615 assert!(batch.validate_decoding());
616
617 Ok(())
618 }
619
620 #[test]
638 fn test_batch_offset_delta() {
639 let mut batch = Batch::<MemoryRecords>::default();
640 assert_eq!(batch.get_base_offset(), 0);
641
642 assert_eq!(batch.last_offset_delta(), -1);
643 assert_eq!(batch.get_last_offset(), -1);
645
646 batch.add_record(Record::default());
647 assert_eq!(batch.last_offset_delta(), 0);
648 assert_eq!(batch.get_last_offset(), 0);
649
650 batch.add_record(Record::default());
651 assert_eq!(batch.last_offset_delta(), 1);
652 assert_eq!(batch.get_last_offset(), 1);
653
654 batch.add_record(Record::default());
655 assert_eq!(batch.last_offset_delta(), 2);
656 assert_eq!(batch.get_last_offset(), 2);
657
658 assert_eq!(
659 batch
660 .records
661 .get(0)
662 .expect("index 0 should exists")
663 .get_offset_delta(),
664 0
665 );
666 assert_eq!(
667 batch
668 .records
669 .get(1)
670 .expect("index 1 should exists")
671 .get_offset_delta(),
672 1
673 );
674 assert_eq!(
675 batch
676 .records
677 .get(2)
678 .expect("index 2 should exists")
679 .get_offset_delta(),
680 2
681 );
682 }
683
684 #[test]
685 fn test_batch_offset_diff_base() {
686 let mut batch = Batch::<MemoryRecords>::default();
687 batch.set_base_offset(1000);
688 assert_eq!(batch.get_base_offset(), 1000);
689
690 assert_eq!(batch.last_offset_delta(), -1);
691 assert_eq!(batch.get_last_offset(), 999);
693
694 batch.add_record(Record::default());
695 assert_eq!(batch.last_offset_delta(), 0);
696 assert_eq!(batch.get_last_offset(), 1000);
697
698 batch.add_record(Record::default());
699 assert_eq!(batch.last_offset_delta(), 1);
700 assert_eq!(batch.get_last_offset(), 1001);
701
702 batch.add_record(Record::default());
703 assert_eq!(batch.last_offset_delta(), 2);
704 assert_eq!(batch.get_last_offset(), 1002);
705
706 assert_eq!(
707 batch
708 .records
709 .get(0)
710 .expect("index 0 should exists")
711 .get_offset_delta(),
712 0
713 );
714 assert_eq!(
715 batch
716 .records
717 .get(1)
718 .expect("index 1 should exists")
719 .get_offset_delta(),
720 1
721 );
722 assert_eq!(
723 batch
724 .records
725 .get(2)
726 .expect("index 2 should exists")
727 .get_offset_delta(),
728 2
729 );
730 }
731
732 #[test]
733 fn test_records_offset_delta() {
734 let mut batch = Batch::<MemoryRecords>::default();
735 batch.set_base_offset(2000);
736 assert_eq!(batch.get_base_offset(), 2000);
737
738 batch.records.append(&mut vec![
740 Record::default(),
741 Record::default(),
742 Record::default(),
743 ]);
744 batch.update_offset_deltas();
745 assert_eq!(batch.last_offset_delta(), 2);
746 assert_eq!(batch.get_last_offset(), 2002);
747
748 assert_eq!(
749 batch
750 .records
751 .get(0)
752 .expect("index 0 should exists")
753 .get_offset_delta(),
754 0
755 );
756 assert_eq!(
757 batch
758 .records
759 .get(1)
760 .expect("index 1 should exists")
761 .get_offset_delta(),
762 1
763 );
764 assert_eq!(
765 batch
766 .records
767 .get(2)
768 .expect("index 2 should exists")
769 .get_offset_delta(),
770 2
771 );
772 }
773
774 #[test]
775 fn test_batch_records_offset() {
776 let mut comparison = Batch::<MemoryRecords>::default();
777 comparison.add_record(Record::default());
778 comparison.add_record(Record::default());
779 comparison.add_record(Record::default());
780
781 let batch_created = Batch::from(vec![
782 Record::default(),
783 Record::default(),
784 Record::default(),
785 ]);
786
787 for i in 0..3 {
788 assert_eq!(
789 batch_created
790 .records
791 .get(i)
792 .expect("get record")
793 .get_offset_delta(),
794 comparison
795 .records
796 .get(i)
797 .expect("get record")
798 .get_offset_delta(),
799 "Creating a Batch from a Vec gave wrong delta",
800 )
801 }
802
803 assert_eq!(batch_created.last_offset_delta(), 2);
804 }
805
806 #[test]
807 fn test_into_consumer_records_iter() {
808 let mut batch = Batch::from(vec![
809 Record::default(),
810 Record::default(),
811 Record::default(),
812 ]);
813
814 batch.header.first_timestamp = 1_500_000_000;
815 let partition_id = 1;
816
817 let consumer_records = batch
818 .into_consumer_records_iter(partition_id)
819 .collect::<Vec<ConsumerRecord>>();
820 assert_eq!(consumer_records.len(), 3);
821 assert_eq!(consumer_records[0].offset(), 0);
822 assert_eq!(consumer_records[1].offset(), 1);
823 assert_eq!(consumer_records[2].offset(), 2);
824
825 consumer_records.iter().for_each(|record| {
826 assert_eq!(record.timestamp(), 1_500_000_000);
827 assert_eq!(record.partition, partition_id);
828 });
829 }
830
831 #[cfg(feature = "memory_batch")]
832 #[test]
833 fn test_memory_batch() {
834 use super::memory::MemoryBatch;
835
836 let record = Record::from(("key", "value"));
837 let size = record.write_size(0);
838
839 let mut mb = MemoryBatch::new(
840 size * 4
841 + Batch::<RawRecords>::default().write_size(0)
842 + Vec::<RawRecords>::default().write_size(0),
843 Compression::None,
844 );
845
846 assert!(mb.push_record(record).is_some());
847 std::thread::sleep(std::time::Duration::from_millis(100));
848 let record = Record::from(("key", "value"));
849 assert!(mb.push_record(record).is_some());
850 std::thread::sleep(std::time::Duration::from_millis(100));
851 let record = Record::from(("key", "value"));
852 assert!(mb.push_record(record).is_some());
853
854 let batch: Batch<MemoryRecords> = mb.try_into().expect("failed to convert");
855 assert!(
856 batch.header.first_timestamp > 0,
857 "first_timestamp is {}",
858 batch.header.first_timestamp
859 );
860 assert!(
861 batch.header.first_timestamp < batch.header.max_time_stamp,
862 "first_timestamp: {}, max_time_stamp: {}",
863 batch.header.first_timestamp,
864 batch.header.max_time_stamp
865 );
866
867 let records_delta: Vec<_> = batch
868 .records()
869 .iter()
870 .map(|record| record.timestamp_delta())
871 .collect();
872 assert_eq!(records_delta[0], 0);
873 assert!(
874 (100..150).contains(&records_delta[1]),
875 "records_delta[1]: {}",
876 records_delta[1]
877 );
878 assert!(
879 (200..250).contains(&records_delta[2]),
880 "records_delta[2]: {}",
881 records_delta[2]
882 );
883 }
884
885 #[test]
886 fn test_batch_len() {
887 let mem_records = vec![Record::default(), Record::default(), Record::default()];
888
889 let batch = Batch::from(mem_records.clone());
891
892 assert_eq!(
893 batch.batch_len(),
894 (BATCH_HEADER_SIZE + mem_records.write_size(0)) as i32
895 );
896
897 let batch_raw_records: Batch<RawRecords> = Batch::try_from(batch).unwrap();
899 assert_eq!(
900 batch_raw_records.batch_len(),
901 mem_records.write_size(0) as i32
902 );
903
904 let batch: Batch = batch_raw_records.try_into().unwrap();
906 assert_eq!(
907 batch.batch_len(),
908 (BATCH_HEADER_SIZE + mem_records.write_size(0)) as i32
909 );
910
911 let mut batch_mem_records: Batch<MemoryRecords> = Batch::new();
913 batch_mem_records.add_records(&mut mem_records.clone());
914 assert_eq!(
915 batch_mem_records.batch_len(),
916 (BATCH_HEADER_SIZE + mem_records.write_size(0)) as i32
917 );
918
919 batch_mem_records.add_record(Record::default());
920 assert_eq!(
921 batch_mem_records.batch_len(),
922 (BATCH_HEADER_SIZE + batch_mem_records.records.write_size(0)) as i32
923 );
924
925 let test_record = Record::new("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
928 let mut test_records = vec![test_record.clone(), test_record.clone(), test_record];
929
930 let mut batch_mem_1: Batch = Batch::new();
931 batch_mem_1.add_records(&mut test_records.clone());
932
933 let mut batch_mem_2: Batch = Batch::new();
934 batch_mem_2.add_records(&mut test_records);
935
936 assert_eq!(batch_mem_1.batch_len(), batch_mem_2.batch_len());
938
939 let no_compression = Compression::None;
941 let compression = Compression::Gzip;
942
943 let header_1 = batch_mem_1.get_mut_header();
944 let header_2 = batch_mem_2.get_mut_header();
945
946 header_1.set_compression(no_compression);
947 header_2.set_compression(compression);
948
949 let not_compressed: Batch<RawRecords> = Batch::try_from(batch_mem_1).unwrap();
950 let compressed: Batch<RawRecords> = Batch::try_from(batch_mem_2).unwrap();
951
952 assert_ne!(not_compressed.batch_len(), compressed.batch_len());
953 assert!(not_compressed.batch_len() > compressed.batch_len());
954 }
955
956 #[cfg(feature = "memory_batch")]
957 #[test]
958 fn test_convert_memory_batch_to_batch() {
959 let num_records = 10;
960
961 let record_data = "I am test input".to_string().into_bytes();
962 let memory_batch_compression = Compression::Gzip;
963
964 let mut memory_batch = MemoryBatch::new(180, memory_batch_compression);
966
967 let mut offset = 0;
968
969 for _ in 0..num_records {
970 offset = memory_batch
971 .push_record(Record {
972 value: RecordData::from(record_data.clone()),
973 ..Default::default()
974 })
975 .expect("Offset should exist");
976 }
977
978 let memory_batch_records_len = memory_batch.records_len();
979 let memory_batch_size_uncompressed = memory_batch.current_size_uncompressed();
980
981 let batch: Batch<MemoryRecords> = memory_batch.into();
982
983 assert_eq!(
984 batch.get_base_offset(),
985 (memory_batch_records_len - 1) as i64
986 );
987
988 assert_eq!(batch.last_offset_delta(), offset as i32);
989 assert_eq!(batch.get_base_offset() as i32, batch.last_offset_delta());
990
991 assert_eq!(
992 batch.get_compression().expect("Compression should exist"),
993 memory_batch_compression
994 );
995
996 assert_eq!(batch.records_len(), memory_batch_records_len);
997
998 assert_eq!(
999 batch.batch_len(),
1000 (BATCH_HEADER_SIZE + memory_batch_size_uncompressed) as i32
1001 );
1002 }
1003}