1
2use std::io::Error;
3use std::mem::size_of;
4use std::fmt::Debug;
5
6use log::trace;
7use crc32c;
8
9use kf_protocol::bytes::Buf;
10use kf_protocol::bytes::BufMut;
11
12use kf_protocol::Decoder;
13use kf_protocol::Encoder;
14use kf_protocol::Version;
15use kf_protocol_derive::Decode;
16
17use crate::Offset;
18use crate::Size;
19use super::DefaultRecord;
20
21pub type DefaultBatchRecords = Vec<DefaultRecord>;
22pub type DefaultBatch = Batch<DefaultBatchRecords>;
23
24
25pub trait BatchRecords: Default + Debug + Encoder + Decoder {
26
27 fn remainder_bytes(&self,remainder: usize ) -> usize {
29 remainder
30 }
31
32}
33
34
35impl BatchRecords for DefaultBatchRecords {}
36
37
38
39pub const BATCH_PREAMBLE_SIZE: usize =
41 size_of::<Offset>() + size_of::<i32>(); #[derive(Default,Debug)]
46pub struct Batch<R> where R: BatchRecords {
47 pub base_offset: Offset,
48 pub batch_len: i32, pub header: BatchHeader,
50 pub records: R
51}
52
53impl <R>Batch<R> where R: BatchRecords {
54
55 pub fn get_mut_header(&mut self) -> &mut BatchHeader {
56 &mut self.header
57 }
58
59 pub fn get_header(&self) -> &BatchHeader {
60 &self.header
61 }
62
63 pub fn get_base_offset(&self) -> Offset {
64 self.base_offset
65 }
66
67 pub fn set_base_offset(&mut self,offset: Offset) {
68 self.base_offset = offset;
69 }
70
71 pub fn base_offset(mut self, offset: Offset) -> Self {
72 self.base_offset = offset;
73 self
74 }
75
76 pub fn set_offset_delta(&mut self,delta: i32) {
77 self.header.last_offset_delta = delta;
78 }
79
80 pub fn get_last_offset(&self) -> Offset {
81 self.get_base_offset() + self.get_last_offset_delta() as Offset
82 }
83
84
85 pub fn get_last_offset_delta(&self) -> Size {
87 self.get_header().last_offset_delta as Size
88 }
89
90 pub fn decode_from_file_buf<T>(&mut self, src: &mut T,version: Version) -> Result<(), Error> where T: Buf,
93 {
94 trace!("decoding premable");
95 self.base_offset.decode(src,version)?;
96 self.batch_len.decode(src,version)?;
97 self.header.decode(src,version)?;
98 Ok(())
99 }
100
101
102}
103
104
105
106
107impl Batch<DefaultBatchRecords> {
108
109
110 pub fn add_record(&mut self,mut record: DefaultRecord) {
112 let last_offset_delta = if self.records.len() == 0 { 0 } else { self.records.len() as Offset };
113 record.preamble.set_offset_delta(last_offset_delta);
114 self.header.last_offset_delta = last_offset_delta as i32;
115 self.records.push(record)
116 }
117
118}
119
120
121
122impl <R>Decoder for Batch<R> where R: BatchRecords {
123
124 fn decode<T>(&mut self, src: &mut T,version: Version) -> Result<(), Error> where T: Buf,
125 {
126 trace!("decoding batch");
127 self.decode_from_file_buf(src,version)?;
128 self.records.decode(src,version)?;
129 Ok(())
130 }
131}
132
133
134
135impl <R>Encoder for Batch<R> where R: BatchRecords {
137
138 fn write_size(&self,version: Version) -> usize {
139 BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE + self.records.write_size(version)
140 }
141
142 fn encode<T>(&self, dest: &mut T,version: Version) -> Result<(), Error> where T: BufMut
143 {
144 trace!("Encoding Batch");
145 self.base_offset.encode(dest,version)?;
146 let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
147 batch_len.encode(dest,version)?;
148
149 self.header.partition_leader_epoch.encode(dest,version)?;
151 self.header.magic.encode(dest,version)?;
152
153
154 let mut out: Vec<u8> = Vec::new();
155 let buf = &mut out;
156 self.header.attributes.encode(buf,version)?;
157 self.header.last_offset_delta.encode(buf,version)?;
158 self.header.first_timestamp.encode(buf,version)?;
159 self.header.max_time_stamp.encode(buf,version)?;
160 self.header.producer_id.encode(buf,version)?;
161 self.header.producer_epoch.encode(buf,version)?;
162 self.header.first_sequence.encode(buf,version)?;
163 self.records.encode(buf,version)?;
164
165 let crc = crc32c::crc32c(&out);
166 crc.encode(dest,version)?;
167 dest.put_slice(&out);
168 Ok(())
169 }
170}
171
172
173#[derive(Debug,Decode)]
174pub struct BatchHeader {
175 pub partition_leader_epoch: i32,
176 pub magic: i8,
177 pub crc: u32,
178 pub attributes: i16,
179 pub last_offset_delta: i32,
180 pub first_timestamp: i64,
181 pub max_time_stamp: i64,
182 pub producer_id: i64,
183 pub producer_epoch: i16,
184 pub first_sequence: i32,
185}
186
187
188impl Default for BatchHeader {
189
190 fn default() -> Self {
191 BatchHeader {
192 partition_leader_epoch: -1,
193 magic: 2,
194 crc: 0,
195 attributes: 0,
196 last_offset_delta: 0,
197 first_timestamp: 0,
198 max_time_stamp: 0,
199 producer_id: -1,
200 producer_epoch: -1,
201 first_sequence: -1,
202 }
203 }
204
205}
206
207#[allow(dead_code)]
208pub const BATCH_HEADER_SIZE: usize =
209 size_of::<i32>() + size_of::<u8>() + size_of::<i32>() + size_of::<i16>() + size_of::<i32>() + size_of::<i64>() + size_of::<i64>() + size_of::<i64>() + size_of::<i16>() + size_of::<i32>(); #[cfg(test)]
223mod test {
224
225
226 use std::io::Cursor;
227 use std::io::Error as IoError;
228
229 use kf_protocol::Decoder;
230 use kf_protocol::Encoder;
231
232 use crate::DefaultRecord;
233 use crate::DefaultBatch;
234
235
236 #[test]
237 fn test_encode_and_decode_batch() -> Result<(),IoError> {
238
239 let record: DefaultRecord = vec![0x74,0x65,0x73,0x74].into();
240 let mut batch = DefaultBatch::default();
241 batch.records.push(record);
242 batch.header.first_timestamp = 1555478494747;
243 batch.header.max_time_stamp = 1555478494747;
244
245 let bytes = batch.as_bytes(0)?;
246 println!("batch raw bytes: {:#X?}",bytes.as_ref());
247
248 let batch = DefaultBatch::decode_from(&mut Cursor::new(bytes),0)?;
249 println!("batch: {:#?}",batch);
250
251 let decoded_record = batch.records.get(0).unwrap();
252 println!("record crc: {}",batch.header.crc);
253 assert_eq!(batch.header.crc, 1514417201);
254 if let Some(ref b) = decoded_record.value.inner_value_ref() {
255 assert_eq!(b.as_slice(),"test".to_owned().as_bytes());
256 } else {
257 assert!(false);
258 }
259
260
261 Ok(())
262
263 }
264
265 #[test]
283 fn test_records_offset() {
284
285 let mut batch = DefaultBatch::default();
286
287 batch.add_record(DefaultRecord::default());
288 batch.add_record(DefaultRecord::default());
289 batch.add_record(DefaultRecord::default());
290
291 assert_eq!(batch.records.get(0).expect("index 0 should exists").get_offset_delta(),0);
292 assert_eq!(batch.records.get(1).expect("index 1 should exists").get_offset_delta(),1);
293 assert_eq!(batch.records.get(2).expect("index 2 should exists").get_offset_delta(),2);
294 assert_eq!(batch.get_last_offset_delta(),2);
295
296 }
297
298
299}
300
301
302