kf_protocol_api/
batch.rs

1
2use std::io::Error;
3use std::mem::size_of;
4use std::fmt::Debug;
5
6use log::trace;
7use crc32c;
8
9use kf_protocol::bytes::Buf;
10use kf_protocol::bytes::BufMut;
11
12use kf_protocol::Decoder;
13use kf_protocol::Encoder;
14use kf_protocol::Version;
15use kf_protocol_derive::Decode;
16
17use crate::Offset;
18use crate::Size;
19use super::DefaultRecord;
20
21pub type DefaultBatchRecords = Vec<DefaultRecord>;
22pub type DefaultBatch = Batch<DefaultBatchRecords>;
23
24
25pub trait BatchRecords: Default + Debug + Encoder + Decoder {
26
27    /// how many bytes does record wants to process
28    fn remainder_bytes(&self,remainder: usize ) -> usize {
29        remainder
30    }
31
32}
33
34
35impl BatchRecords for DefaultBatchRecords {}
36
37
38
39/// size of the offset and length
40pub const BATCH_PREAMBLE_SIZE: usize =  
41        size_of::<Offset>()     // Offset
42        + size_of::<i32>();       // i32
43
44
45#[derive(Default,Debug)]
46pub struct Batch<R> where R: BatchRecords {
47    pub base_offset: Offset,
48    pub batch_len: i32,       // only for decoding
49    pub header: BatchHeader,
50    pub records: R
51}
52
53impl <R>Batch<R> where R: BatchRecords {
54
55    pub fn get_mut_header(&mut self) -> &mut BatchHeader {
56        &mut self.header
57    }
58
59    pub fn get_header(&self) -> &BatchHeader {
60        &self.header
61    }
62
63    pub fn get_base_offset(&self) -> Offset {
64        self.base_offset
65    }
66
67    pub fn set_base_offset(&mut self,offset: Offset)  {
68        self.base_offset = offset;
69    }
70
71    pub fn base_offset(mut self, offset: Offset) -> Self {
72        self.base_offset = offset;
73        self
74    }
75
76    pub fn set_offset_delta(&mut self,delta: i32) {
77        self.header.last_offset_delta = delta;
78    }
79
80    pub fn get_last_offset(&self) -> Offset {
81        self.get_base_offset() + self.get_last_offset_delta() as Offset
82    }
83    
84
85    /// get last offset delta
86    pub fn get_last_offset_delta(&self) -> Size {
87        self.get_header().last_offset_delta as Size
88    }
89
90    /// decode from buf stored in the file
91    /// read all excluding records
92    pub fn decode_from_file_buf<T>(&mut self, src: &mut T,version: Version) -> Result<(), Error> where T: Buf,
93    {
94        trace!("decoding premable");
95        self.base_offset.decode(src,version)?;
96        self.batch_len.decode(src,version)?;
97        self.header.decode(src,version)?;
98        Ok(())
99    }
100
101
102}
103
104
105
106
107impl Batch<DefaultBatchRecords>  {
108
109
110    /// add new record, this will update the offset to correct
111    pub fn add_record(&mut self,mut record: DefaultRecord) {
112        let last_offset_delta = if self.records.len() == 0 { 0 } else { self.records.len() as Offset };
113        record.preamble.set_offset_delta(last_offset_delta);
114        self.header.last_offset_delta = last_offset_delta as i32;
115        self.records.push(record)
116    }
117
118}
119
120
121
122impl <R>Decoder for Batch<R> where R: BatchRecords  {
123
124    fn decode<T>(&mut self, src: &mut T,version: Version) -> Result<(), Error> where T: Buf,
125    {
126        trace!("decoding batch");
127        self.decode_from_file_buf(src,version)?;
128        self.records.decode(src,version)?;
129        Ok(())
130    }
131}
132
133
134
135// Record batch contains 12 bytes of pre-amble plus header + records
136impl <R>Encoder for Batch<R>  where R: BatchRecords {
137
138    fn write_size(&self,version: Version) -> usize {
139        BATCH_PREAMBLE_SIZE + BATCH_HEADER_SIZE + self.records.write_size(version)
140    }
141
142    fn encode<T>(&self, dest: &mut T,version: Version) -> Result<(), Error> where T: BufMut
143    {
144        trace!("Encoding Batch");
145        self.base_offset.encode(dest,version)?;
146        let batch_len: i32 = (BATCH_HEADER_SIZE + self.records.write_size(version)) as i32;
147        batch_len.encode(dest,version)?;
148
149        // encode parts of header
150        self.header.partition_leader_epoch.encode(dest,version)?;
151        self.header.magic.encode(dest,version)?;
152
153
154        let mut out: Vec<u8> = Vec::new();
155        let buf = &mut out;
156        self.header.attributes.encode(buf,version)?;
157        self.header.last_offset_delta.encode(buf,version)?;
158        self.header.first_timestamp.encode(buf,version)?;
159        self.header.max_time_stamp.encode(buf,version)?;
160        self.header.producer_id.encode(buf,version)?;
161        self.header.producer_epoch.encode(buf,version)?;
162        self.header.first_sequence.encode(buf,version)?;
163        self.records.encode(buf,version)?;
164
165        let crc = crc32c::crc32c(&out);
166        crc.encode(dest,version)?;
167        dest.put_slice(&out);        
168        Ok(())
169    }
170}
171
172
173#[derive(Debug,Decode)]
174pub struct BatchHeader {
175    pub partition_leader_epoch: i32,
176    pub magic: i8,
177    pub crc: u32,
178    pub attributes: i16,
179    pub last_offset_delta: i32,
180    pub first_timestamp: i64,
181    pub max_time_stamp: i64,
182    pub producer_id: i64,
183    pub producer_epoch: i16,
184    pub first_sequence: i32,
185}
186
187
188impl Default for BatchHeader {  
189
190    fn default() -> Self {
191        BatchHeader {
192            partition_leader_epoch: -1,
193            magic: 2,
194            crc: 0,
195            attributes: 0,
196            last_offset_delta: 0,
197            first_timestamp: 0,
198            max_time_stamp: 0,
199            producer_id: -1,
200            producer_epoch: -1,
201            first_sequence: -1,
202        }
203    }
204
205}
206
207#[allow(dead_code)]
208pub const BATCH_HEADER_SIZE: usize =  
209        size_of::<i32>()     // partition leader epoch
210        + size_of::<u8>()       // magic
211        + size_of::<i32>()      //crc
212        + size_of::<i16>()      // i16
213        + size_of::<i32>()      // last offset delta
214        + size_of::<i64>()      // first_timestamp
215        + size_of::<i64>()      // max_time_stamp
216        + size_of::<i64>()      //producer id
217        + size_of::<i16>()      // produce_epoch
218        + size_of::<i32>();      // first sequence
219
220
221
222#[cfg(test)]
223mod test {
224
225   
226    use std::io::Cursor;
227    use std::io::Error as IoError;
228
229    use kf_protocol::Decoder;
230    use kf_protocol::Encoder;
231
232    use crate::DefaultRecord;
233    use crate::DefaultBatch;
234    
235
236    #[test]
237    fn test_encode_and_decode_batch() -> Result<(),IoError> {
238
239        let record: DefaultRecord = vec![0x74,0x65,0x73,0x74].into();
240        let mut batch = DefaultBatch::default();
241        batch.records.push(record);
242        batch.header.first_timestamp = 1555478494747;
243        batch.header.max_time_stamp = 1555478494747;
244
245        let bytes = batch.as_bytes(0)?;
246        println!("batch raw bytes: {:#X?}",bytes.as_ref());
247
248        let batch = DefaultBatch::decode_from(&mut Cursor::new(bytes),0)?;
249        println!("batch: {:#?}",batch);
250        
251        let decoded_record = batch.records.get(0).unwrap();
252        println!("record crc: {}",batch.header.crc);
253        assert_eq!(batch.header.crc, 1514417201);
254        if let Some(ref b) = decoded_record.value.inner_value_ref() {
255            assert_eq!(b.as_slice(),"test".to_owned().as_bytes());
256        } else {
257            assert!(false);
258        }
259        
260        
261        Ok(())
262
263    }
264
265    /*  raw batch encoded
266
267    0000   02 00 00 00 45 00 00 c7 00 00 40 00 40 06 00 00
268    0010   c0 a8 07 30 c0 a8 07 30 d1 b9 23 84 29 ba 3d 48
269    0020   0b 13 89 98 80 18 97 62 90 6a 00 00 01 01 08 0a
270    0030   1e 6f 09 0d 1e 6f 09 06 00 00 00 8f 00 00 00 05
271    0040   00 00 00 03 00 10 63 6f 6e 73 6f 6c 65 2d 70 72
272    0050   6f 64 75 63 65 72 ff ff 00 01 00 00 05 dc 00 00
273    0060   00 01 00 13 6d 79 2d 72 65 70 6c 69 63 61 74 65
274    0070   64 2d 74 6f 70 69 63 00 00 00 01 00 00 00 00 00
275    0080   00 00 48 00 00 00 00 00 00 00 00 00 00 00 3c ff
276    0090   ff ff ff 02 5a 44 2c 31 00 00 00 00 00 00 00 00
277    00a0   01 6a 29 be 3e 1b 00 00 01 6a 29 be 3e 1b ff ff
278    00b0   ff ff ff ff ff ff ff ff ff ff ff ff 00 00 00 01
279    00c0   14 00 00 00 01 08 74 65 73 74 00
280    */
281
282    #[test]
283    fn test_records_offset() {
284
285        let mut batch = DefaultBatch::default();
286
287        batch.add_record(DefaultRecord::default());
288        batch.add_record(DefaultRecord::default());
289        batch.add_record(DefaultRecord::default());
290
291        assert_eq!(batch.records.get(0).expect("index 0 should exists").get_offset_delta(),0);
292        assert_eq!(batch.records.get(1).expect("index 1 should exists").get_offset_delta(),1);
293        assert_eq!(batch.records.get(2).expect("index 2 should exists").get_offset_delta(),2);
294        assert_eq!(batch.get_last_offset_delta(),2);
295
296    }
297
298
299}
300
301
302