kf_protocol_api/
record.rs

1use std::fmt;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::io::Error;
5use std::io::ErrorKind;
6
7use content_inspector::{inspect, ContentType};
8use log::trace;
9use log::warn;
10
11use kf_protocol::bytes::Buf;
12use kf_protocol::bytes::BufExt;
13use kf_protocol::bytes::BufMut;
14
15use kf_protocol::Decoder;
16use kf_protocol::DecoderVarInt;
17use kf_protocol::Encoder;
18use kf_protocol::EncoderVarInt;
19use kf_protocol::Version;
20use kf_protocol_derive::Decode;
21use kf_protocol_derive::Encode;
22
23use crate::DefaultBatch;
24use crate::Offset;
25
26pub type DefaultRecord = Record<DefaultAsyncBuffer>;
27
28/// slice that can works in Async Context
29pub trait AsyncBuffer {
30    fn len(&self) -> usize;
31}
32
33pub trait Records {}
34
35#[derive(Default)]
36pub struct DefaultAsyncBuffer(Option<Vec<u8>>);
37
38impl DefaultAsyncBuffer {
39    pub fn new(val: Option<Vec<u8>>) -> Self {
40        DefaultAsyncBuffer(val)
41    }
42
43    pub fn inner_value(self) -> Option<Vec<u8>> {
44        self.0
45    }
46
47    pub fn inner_value_ref(&self) -> &Option<Vec<u8>> {
48        &self.0
49    }
50
51    pub fn len(&self) -> usize {
52        if self.0.is_some() {
53            self.0.as_ref().unwrap().len()
54        } else {
55            0
56        }
57    }
58
59    /// Check if value is binary content
60    pub fn is_binary(&self) -> bool {
61        if let Some(value) = self.inner_value_ref() {
62            match inspect(value) {
63                ContentType::BINARY => true,
64                _ => false,
65            }
66        } else {
67            false
68        }
69    }
70
71    /// Describe value - return text, binary, or 0 bytes
72    pub fn describe(&self) -> String {
73        if self.inner_value_ref().is_some() {
74            if self.is_binary() {
75                format!("binary: ({} bytes)", self.len())
76            } else {
77                format!("text: '{}'", self)
78            }
79        } else {
80            format!("empty: (0 bytes)")
81        }
82    }
83}
84
85impl From<Option<Vec<u8>>> for DefaultAsyncBuffer {
86    fn from(val: Option<Vec<u8>>) -> Self {
87        Self::new(val)
88    }
89}
90
91impl AsyncBuffer for DefaultAsyncBuffer {
92    fn len(&self) -> usize {
93        match self.0 {
94            Some(ref val) => val.len(),
95            None => 0,
96        }
97    }
98}
99
100impl Debug for DefaultAsyncBuffer {
101    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102        match self.0 {
103            Some(ref val) => write!(f, "{:?}", String::from_utf8_lossy(val)),
104            None => write!(f, "no values"),
105        }
106    }
107}
108
109impl Display for DefaultAsyncBuffer {
110    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111        match self.0 {
112            Some(ref val) => write!(f, "{}", String::from_utf8_lossy(val)),
113            None => write!(f, ""),
114        }
115    }
116}
117
118impl From<String> for DefaultAsyncBuffer {
119    fn from(value: String) -> Self {
120        Self(Some(value.into_bytes()))
121    }
122}
123
124impl From<Vec<u8>> for DefaultAsyncBuffer {
125    fn from(value: Vec<u8>) -> Self {
126        Self(Some(value))
127    }
128}
129
130impl Encoder for DefaultAsyncBuffer {
131    fn write_size(&self, _version: Version) -> usize {
132        self.0.var_write_size()
133    }
134
135    fn encode<T>(&self, src: &mut T, _version: Version) -> Result<(), Error>
136    where
137        T: BufMut,
138    {
139        self.0.encode_varint(src)?;
140
141        Ok(())
142    }
143}
144
145impl Decoder for DefaultAsyncBuffer {
146    fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
147    where
148        T: Buf,
149    {
150        trace!("decoding default asyncbuffer");
151        self.0.decode_varint(src)?;
152        trace!("value: {:#?}", self);
153        Ok(())
154    }
155}
156
157#[derive(Default, Debug)]
158pub struct RecordSet {
159    pub batches: Vec<DefaultBatch>,
160}
161
162impl fmt::Display for RecordSet {
163    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164        write!(f, "{} batches", self.batches.len())
165    }
166}
167
168impl RecordSet {
169    pub fn add(mut self, batch: DefaultBatch) -> Self {
170        self.batches.push(batch);
171        self
172    }
173}
174
175impl Decoder for RecordSet {
176    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
177    where
178        T: Buf,
179    {
180        trace!("raw buffer len: {}",src.remaining());
181        let mut len: i32 = 0;
182        len.decode(src, version)?;
183        trace!("Record sets decoded content len: {}", len);
184
185        if src.remaining() < len as usize {
186            return Err(Error::new(
187                ErrorKind::UnexpectedEof,
188                format!("expected message len: {} but founded {}",len,src.remaining())
189            ));
190        }
191
192        let mut buf = src.take(len as usize);
193
194        let mut count = 0;
195        while buf.remaining() > 0 {
196            trace!("decoding batches: {}, remaining bytes: {}",count,buf.remaining());
197            let mut batch = DefaultBatch::default();
198            match batch.decode(&mut buf, version) {
199                Ok(_) => self.batches.push(batch),
200                Err(err) => match err.kind() {
201                    ErrorKind::UnexpectedEof => {
202                        warn!("not enough bytes for batch: {}", buf.remaining());
203                        return Ok(())
204                    }
205                    _ => {
206                        warn!("problem decoding batch: {}", err);
207                        return Ok(())
208                    }
209                },
210            }
211            count = count + 1;
212        }
213
214        
215        Ok(())
216    }
217}
218
219impl Encoder for RecordSet {
220    fn write_size(&self, version: Version) -> usize {
221        self.batches
222            .iter()
223            .fold(4, |sum, val| sum + val.write_size(version))
224    }
225
226    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
227    where
228        T: BufMut,
229    {
230        trace!("Record set encoding");
231
232        let mut out: Vec<u8> = Vec::new();
233
234        for batch in &self.batches {
235            trace!("encoding batch..");
236            batch.encode(&mut out, version)?;
237        }
238
239        let length: i32 = out.len() as i32;
240        trace!("Record Set encode len: {}", length);
241        length.encode(dest, version)?;
242
243        dest.put_slice(&mut out);
244        Ok(())
245    }
246}
247
248#[derive(Decode, Encode, Default, Debug)]
249pub struct RecordHeader {
250    attributes: i8,
251    #[varint]
252    timestamp_delta: i64,
253    #[varint]
254    offset_delta: Offset,
255}
256
257impl RecordHeader {
258    pub fn set_offset_delta(&mut self, delta: Offset) {
259        self.offset_delta = delta;
260    }
261}
262
263#[derive(Default)]
264pub struct Record<B>
265where
266    B: Default,
267{
268    pub preamble: RecordHeader,
269    pub key: B,
270    pub value: B,
271    pub headers: i64,
272}
273
274impl<B> Record<B>
275where
276    B: Default,
277{
278    pub fn get_offset_delta(&self) -> Offset {
279        self.preamble.offset_delta
280    }
281
282    pub fn get_value(&self) -> &B {
283        &self.value
284    }
285
286    pub fn value(self) -> B {
287        self.value
288    }
289}
290
291impl<B> Debug for Record<B>
292where
293    B: AsyncBuffer + Debug + Default,
294{
295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296        writeln!(f, "{:?}", &self.preamble)?;
297        writeln!(f, "{:?}", &self.key)?;
298        writeln!(f, "{:?}", &self.value)?;
299        write!(f, "{:?}", &self.headers)
300    }
301}
302
303impl<B> From<String> for Record<B>
304where
305    B: From<String> + Default,
306{
307    fn from(value: String) -> Self {
308        let mut record = Record::default();
309        record.value = value.into();
310        record
311    }
312}
313
314impl<B> From<Vec<u8>> for Record<B>
315where
316    B: From<Vec<u8>> + Default,
317{
318    fn from(value: Vec<u8>) -> Self {
319        let mut record = Record::default();
320        record.value = value.into();
321        record
322    }
323}
324
325impl<B> Encoder for Record<B>
326where
327    B: Encoder + Default,
328{
329    fn write_size(&self, version: Version) -> usize {
330        let inner_size = self.preamble.write_size(version)
331            + self.key.write_size(version)
332            + self.value.write_size(version)
333            + self.headers.var_write_size();
334        let len: i64 = inner_size as i64;
335        len.var_write_size() + inner_size
336    }
337
338    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
339    where
340        T: BufMut,
341    {
342        let mut out: Vec<u8> = Vec::new();
343        self.preamble.encode(&mut out, version)?;
344        self.key.encode(&mut out, version)?;
345        self.value.encode(&mut out, version)?;
346        self.headers.encode_varint(&mut out)?;
347        let len: i64 = out.len() as i64;
348        trace!("record encode as {} bytes", len);
349        len.encode_varint(dest)?;
350        dest.put_slice(&out);
351        Ok(())
352    }
353}
354
355impl<B> Decoder for Record<B>
356where
357    B: Decoder,
358{
359    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
360    where
361        T: Buf,
362    {
363        trace!("decoding record");
364        let mut len: i64 = 0;
365        len.decode_varint(src)?;
366
367        trace!("record contains: {} bytes", len);
368
369        if (src.remaining() as i64) < len {
370            return Err(Error::new(
371                ErrorKind::UnexpectedEof,
372                "not enought for record",
373            ));
374        }
375        self.preamble.decode(src, version)?;
376        trace!("offset delta: {}", self.preamble.offset_delta);
377        self.key.decode(src, version)?;
378        self.value.decode(src, version)?;
379        self.headers.decode_varint(src)?;
380
381        Ok(())
382    }
383}
384
385#[cfg(test)]
386mod test {
387
388    use std::io::Cursor;
389    use std::io::Error as IoError;
390
391    use kf_protocol::Decoder;
392    use kf_protocol::Encoder;
393
394    use crate::DefaultRecord;
395
396    #[test]
397    fn test_decode_encode_record() -> Result<(), IoError> {
398        let data = [
399            0x14, // record length of 10
400            0x00, // attributes
401            0xea, 0x0e, // timestamp
402            0x02, // offset delta, 1
403            0x01, // key
404            0x06, 0x64, 0x6f, 0x67, // value, 3 bytes len (dog)
405            0x00, // 0 header
406        ];
407
408        let record = DefaultRecord::decode_from(&mut Cursor::new(&data), 0)?;
409        assert_eq!(record.as_bytes(0)?.len(), data.len());
410
411        assert_eq!(record.write_size(0), data.len());
412        assert_eq!(record.get_offset_delta(), 1);
413        assert!(record.key.inner_value().is_none());
414        let val = record.value.inner_value();
415        assert!(val.is_some());
416        let value = val.unwrap();
417        assert_eq!(value.len(), 3);
418        assert_eq!(value[0], 0x64);
419
420        Ok(())
421    }
422
423    /// test decoding of records when one of the batch was truncated
424    #[test]
425    fn test_decode_batch_truncation() {
426
427
428        use super::RecordSet;
429        use crate::DefaultBatch;
430        use crate::DefaultRecord;
431
432        fn create_batch() -> DefaultBatch {
433            let record: DefaultRecord = vec![0x74, 0x65, 0x73, 0x74].into();
434            let mut batch = DefaultBatch::default();
435            batch.records.push(record);
436            batch
437        }
438       
439        // add 3 batches
440        let batches = RecordSet::default()
441            .add(create_batch())
442            .add(create_batch())
443            .add(create_batch());
444
445        const TRUNCATED: usize = 10;
446
447        let mut bytes = batches.as_bytes(0).expect("bytes");
448    
449        let original_len = bytes.len();
450        let _ = bytes.split_off(original_len-TRUNCATED); // truncate record sets
451        let body = bytes.split_off(4);  // split off body so we can manipulate len
452
453        let new_len = (original_len - TRUNCATED - 4) as i32;
454        let mut out = vec![];
455        new_len.encode(&mut out, 0).expect("encoding");
456        out.extend_from_slice(&body);
457
458        assert_eq!(out.len(),original_len - TRUNCATED);
459
460        println!("decoding...");
461        let decoded_batches = RecordSet::decode_from(&mut Cursor::new(out), 0).expect("decoding");
462        assert_eq!(decoded_batches.batches.len(), 2);
463    }
464}