use std::io::Error;
use std::mem::size_of;
use std::fmt::Debug;
use log::trace;
use crate::core::bytes::Buf;
use crate::core::bytes::BufMut;
use crate::core::Decoder;
use crate::core::Encoder;
use crate::core::Version;
use crate::derive::Decode;
use crate::derive::Encode;
use crate::Offset;
use crate::Size;
use crate::record::DefaultRecord;
pub type DefaultBatchRecords = Vec<DefaultRecord>;
pub type DefaultBatch = Batch<DefaultBatchRecords>;
pub trait BatchRecords: Default + Debug + Encoder + Decoder {
fn remainder_bytes(&self, remainder: usize) -> usize {
remainder
}
}
impl BatchRecords for DefaultBatchRecords {}
pub const BATCH_PREAMBLE_SIZE: usize = size_of::<Offset>()
+ size_of::<i32>();
pub const BATCH_FILE_HEADER_SIZE: usize = BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE;
#[derive(Default, Debug)]
pub struct Batch<R>
where
R: BatchRecords,
{
pub base_offset: Offset,
pub batch_len: i32,
pub header: BatchHeader,
records: R,
}
impl<R> Batch<R>
where
R: BatchRecords,
{
pub fn get_mut_header(&mut self) -> &mut BatchHeader {
&mut self.header
}
pub fn get_header(&self) -> &BatchHeader {
&self.header
}
#[inline(always)]
pub fn own_records(self) -> R {
self.records
}
#[inline(always)]
pub fn records(&self) -> &R {
&self.records
}
#[inline(always)]
pub fn mut_records(&mut self) -> &mut R {
&mut self.records
}
pub fn get_base_offset(&self) -> Offset {
self.base_offset
}
pub fn set_base_offset(&mut self, offset: Offset) {
self.base_offset = offset;
}
pub fn base_offset(mut self, offset: Offset) -> Self {
self.base_offset = offset;
self
}
pub fn add_to_offset_delta(&mut self, delta: i32) {
self.header.last_offset_delta += delta;
}
pub fn set_offset_delta(&mut self, delta: i32) {
self.header.last_offset_delta = delta;
}
pub fn get_last_offset(&self) -> Offset {
self.get_base_offset() + self.get_last_offset_delta() as Offset
}
pub fn get_last_offset_delta(&self) -> Size {
self.get_header().last_offset_delta as Size
}
pub fn decode_from_file_buf<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
where
T: Buf,
{
trace!("decoding premable");
self.base_offset.decode(src, version)?;
self.batch_len.decode(src, version)?;
self.header.decode(src, version)?;
Ok(())
}
}
impl DefaultBatch {
pub fn new(records: Vec<DefaultRecord>) -> Self {
let mut batch = DefaultBatch::default();
let records: Vec<_> = records
.into_iter()
.enumerate()
.map(|(i, mut record)| {
record.preamble.set_offset_delta(i as Offset);
record
})
.collect();
batch.records = records;
let len = batch.records.len() as i32;
batch.header.last_offset_delta = if len > 0 { len - 1 } else { len };
batch
}
pub fn add_record(&mut self, mut record: DefaultRecord) {
let last_offset_delta = if self.records.is_empty() {
0
} else {
self.records.len() as Offset
};
record.preamble.set_offset_delta(last_offset_delta);
self.header.last_offset_delta = last_offset_delta as i32;
self.records.push(record)
}
pub fn computed_last_offset(&self) -> Offset {
self.get_base_offset() + self.records.len() as Offset
}
}
impl<R> Decoder for Batch<R>
where
R: BatchRecords,
{
fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
where
T: Buf,
{
trace!("decoding batch");
self.decode_from_file_buf(src, version)?;
self.records.decode(src, version)?;
Ok(())
}
}
impl<R> Encoder for Batch<R>
where
R: BatchRecords,
{
fn write_size(&self, version: Version) -> usize {
BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE + self.records.write_size(version)
}
fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
where
T: BufMut,
{
trace!("Encoding Batch");
self.base_offset.encode(dest, version)?;
let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
batch_len.encode(dest, version)?;
self.header.partition_leader_epoch.encode(dest, version)?;
self.header.magic.encode(dest, version)?;
let mut out: Vec<u8> = Vec::new();
let buf = &mut out;
self.header.attributes.encode(buf, version)?;
self.header.last_offset_delta.encode(buf, version)?;
self.header.first_timestamp.encode(buf, version)?;
self.header.max_time_stamp.encode(buf, version)?;
self.header.producer_id.encode(buf, version)?;
self.header.producer_epoch.encode(buf, version)?;
self.header.first_sequence.encode(buf, version)?;
self.records.encode(buf, version)?;
let crc = crc32c::crc32c(&out);
crc.encode(dest, version)?;
dest.put_slice(&out);
Ok(())
}
}
#[derive(Debug, Decode, Encode)]
pub struct BatchHeader {
pub partition_leader_epoch: i32,
pub magic: i8,
pub crc: u32,
pub attributes: i16,
pub last_offset_delta: i32,
pub first_timestamp: i64,
pub max_time_stamp: i64,
pub producer_id: i64,
pub producer_epoch: i16,
pub first_sequence: i32,
}
impl Default for BatchHeader {
fn default() -> Self {
BatchHeader {
partition_leader_epoch: -1,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
first_timestamp: 0,
max_time_stamp: 0,
producer_id: -1,
producer_epoch: -1,
first_sequence: -1,
}
}
}
pub const BATCH_HEADER_SIZE: usize = size_of::<i32>()
+ size_of::<u8>()
+ size_of::<i32>()
+ size_of::<i16>()
+ size_of::<i32>()
+ size_of::<i64>()
+ size_of::<i64>()
+ size_of::<i64>()
+ size_of::<i16>()
+ size_of::<i32>();
#[cfg(test)]
mod test {
use std::io::Cursor;
use std::io::Error as IoError;
use crate::core::Decoder;
use crate::core::Encoder;
use crate::record::{DefaultRecord, DefaultAsyncBuffer};
use crate::batch::DefaultBatch;
use super::BatchHeader;
use super::BATCH_HEADER_SIZE;
#[test]
fn test_batch_size() {
let header = BatchHeader::default();
assert_eq!(header.write_size(0), BATCH_HEADER_SIZE);
}
#[test]
fn test_encode_and_decode_batch() -> Result<(), IoError> {
let value = vec![0x74, 0x65, 0x73, 0x74];
let record = DefaultRecord {
value: DefaultAsyncBuffer::new(value),
..Default::default()
};
let mut batch = DefaultBatch::default();
batch.records.push(record);
batch.header.first_timestamp = 1555478494747;
batch.header.max_time_stamp = 1555478494747;
let bytes = batch.as_bytes(0)?;
println!("batch raw bytes: {:#X?}", bytes.as_ref());
let batch = DefaultBatch::decode_from(&mut Cursor::new(bytes), 0)?;
println!("batch: {:#?}", batch);
let decoded_record = batch.records.get(0).unwrap();
println!("record crc: {}", batch.header.crc);
assert_eq!(batch.header.crc, 843514105);
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");
Ok(())
}
#[test]
fn test_records_offset() {
let mut batch = DefaultBatch::default();
assert_eq!(batch.get_last_offset_delta(), 0);
batch.add_record(DefaultRecord::default());
assert_eq!(batch.get_last_offset_delta(), 0);
batch.add_record(DefaultRecord::default());
assert_eq!(batch.get_last_offset_delta(), 1);
batch.add_record(DefaultRecord::default());
assert_eq!(batch.get_last_offset_delta(), 2);
assert_eq!(
batch
.records
.get(0)
.expect("index 0 should exists")
.get_offset_delta(),
0
);
assert_eq!(
batch
.records
.get(1)
.expect("index 1 should exists")
.get_offset_delta(),
1
);
assert_eq!(
batch
.records
.get(2)
.expect("index 2 should exists")
.get_offset_delta(),
2
);
assert_eq!(batch.get_last_offset_delta(), 2);
}
#[test]
fn test_batch_records_offset() {
let mut comparison = DefaultBatch::default();
comparison.add_record(DefaultRecord::default());
comparison.add_record(DefaultRecord::default());
comparison.add_record(DefaultRecord::default());
let batch_created = DefaultBatch::new(vec![
DefaultRecord::default(),
DefaultRecord::default(),
DefaultRecord::default(),
]);
for i in 0..3 {
assert_eq!(
batch_created
.records
.get(i)
.expect("get record")
.get_offset_delta(),
comparison
.records
.get(i)
.expect("get record")
.get_offset_delta(),
"Creating a DefaultBatch from a Vec gave wrong delta",
)
}
assert_eq!(batch_created.get_last_offset_delta(), 2);
}
}