1use std::fmt;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::io::Error;
5use std::io::ErrorKind;
6
7use content_inspector::{inspect, ContentType};
8use log::trace;
9use log::warn;
10
11use kf_protocol::bytes::Buf;
12use kf_protocol::bytes::BufExt;
13use kf_protocol::bytes::BufMut;
14
15use kf_protocol::Decoder;
16use kf_protocol::DecoderVarInt;
17use kf_protocol::Encoder;
18use kf_protocol::EncoderVarInt;
19use kf_protocol::Version;
20use kf_protocol_derive::Decode;
21use kf_protocol_derive::Encode;
22
23use crate::DefaultBatch;
24use crate::Offset;
25
26pub type DefaultRecord = Record<DefaultAsyncBuffer>;
27
28pub trait AsyncBuffer {
30 fn len(&self) -> usize;
31}
32
33pub trait Records {}
34
35#[derive(Default)]
36pub struct DefaultAsyncBuffer(Option<Vec<u8>>);
37
38impl DefaultAsyncBuffer {
39 pub fn new(val: Option<Vec<u8>>) -> Self {
40 DefaultAsyncBuffer(val)
41 }
42
43 pub fn inner_value(self) -> Option<Vec<u8>> {
44 self.0
45 }
46
47 pub fn inner_value_ref(&self) -> &Option<Vec<u8>> {
48 &self.0
49 }
50
51 pub fn len(&self) -> usize {
52 if self.0.is_some() {
53 self.0.as_ref().unwrap().len()
54 } else {
55 0
56 }
57 }
58
59 pub fn is_binary(&self) -> bool {
61 if let Some(value) = self.inner_value_ref() {
62 match inspect(value) {
63 ContentType::BINARY => true,
64 _ => false,
65 }
66 } else {
67 false
68 }
69 }
70
71 pub fn describe(&self) -> String {
73 if self.inner_value_ref().is_some() {
74 if self.is_binary() {
75 format!("binary: ({} bytes)", self.len())
76 } else {
77 format!("text: '{}'", self)
78 }
79 } else {
80 format!("empty: (0 bytes)")
81 }
82 }
83}
84
85impl From<Option<Vec<u8>>> for DefaultAsyncBuffer {
86 fn from(val: Option<Vec<u8>>) -> Self {
87 Self::new(val)
88 }
89}
90
91impl AsyncBuffer for DefaultAsyncBuffer {
92 fn len(&self) -> usize {
93 match self.0 {
94 Some(ref val) => val.len(),
95 None => 0,
96 }
97 }
98}
99
100impl Debug for DefaultAsyncBuffer {
101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102 match self.0 {
103 Some(ref val) => write!(f, "{:?}", String::from_utf8_lossy(val)),
104 None => write!(f, "no values"),
105 }
106 }
107}
108
109impl Display for DefaultAsyncBuffer {
110 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
111 match self.0 {
112 Some(ref val) => write!(f, "{}", String::from_utf8_lossy(val)),
113 None => write!(f, ""),
114 }
115 }
116}
117
118impl From<String> for DefaultAsyncBuffer {
119 fn from(value: String) -> Self {
120 Self(Some(value.into_bytes()))
121 }
122}
123
124impl From<Vec<u8>> for DefaultAsyncBuffer {
125 fn from(value: Vec<u8>) -> Self {
126 Self(Some(value))
127 }
128}
129
130impl Encoder for DefaultAsyncBuffer {
131 fn write_size(&self, _version: Version) -> usize {
132 self.0.var_write_size()
133 }
134
135 fn encode<T>(&self, src: &mut T, _version: Version) -> Result<(), Error>
136 where
137 T: BufMut,
138 {
139 self.0.encode_varint(src)?;
140
141 Ok(())
142 }
143}
144
145impl Decoder for DefaultAsyncBuffer {
146 fn decode<T>(&mut self, src: &mut T, _version: Version) -> Result<(), Error>
147 where
148 T: Buf,
149 {
150 trace!("decoding default asyncbuffer");
151 self.0.decode_varint(src)?;
152 trace!("value: {:#?}", self);
153 Ok(())
154 }
155}
156
157#[derive(Default, Debug)]
158pub struct RecordSet {
159 pub batches: Vec<DefaultBatch>,
160}
161
162impl fmt::Display for RecordSet {
163 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164 write!(f, "{} batches", self.batches.len())
165 }
166}
167
168impl RecordSet {
169 pub fn add(mut self, batch: DefaultBatch) -> Self {
170 self.batches.push(batch);
171 self
172 }
173}
174
175impl Decoder for RecordSet {
176 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
177 where
178 T: Buf,
179 {
180 trace!("raw buffer len: {}",src.remaining());
181 let mut len: i32 = 0;
182 len.decode(src, version)?;
183 trace!("Record sets decoded content len: {}", len);
184
185 if src.remaining() < len as usize {
186 return Err(Error::new(
187 ErrorKind::UnexpectedEof,
188 format!("expected message len: {} but founded {}",len,src.remaining())
189 ));
190 }
191
192 let mut buf = src.take(len as usize);
193
194 let mut count = 0;
195 while buf.remaining() > 0 {
196 trace!("decoding batches: {}, remaining bytes: {}",count,buf.remaining());
197 let mut batch = DefaultBatch::default();
198 match batch.decode(&mut buf, version) {
199 Ok(_) => self.batches.push(batch),
200 Err(err) => match err.kind() {
201 ErrorKind::UnexpectedEof => {
202 warn!("not enough bytes for batch: {}", buf.remaining());
203 return Ok(())
204 }
205 _ => {
206 warn!("problem decoding batch: {}", err);
207 return Ok(())
208 }
209 },
210 }
211 count = count + 1;
212 }
213
214
215 Ok(())
216 }
217}
218
219impl Encoder for RecordSet {
220 fn write_size(&self, version: Version) -> usize {
221 self.batches
222 .iter()
223 .fold(4, |sum, val| sum + val.write_size(version))
224 }
225
226 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
227 where
228 T: BufMut,
229 {
230 trace!("Record set encoding");
231
232 let mut out: Vec<u8> = Vec::new();
233
234 for batch in &self.batches {
235 trace!("encoding batch..");
236 batch.encode(&mut out, version)?;
237 }
238
239 let length: i32 = out.len() as i32;
240 trace!("Record Set encode len: {}", length);
241 length.encode(dest, version)?;
242
243 dest.put_slice(&mut out);
244 Ok(())
245 }
246}
247
248#[derive(Decode, Encode, Default, Debug)]
249pub struct RecordHeader {
250 attributes: i8,
251 #[varint]
252 timestamp_delta: i64,
253 #[varint]
254 offset_delta: Offset,
255}
256
257impl RecordHeader {
258 pub fn set_offset_delta(&mut self, delta: Offset) {
259 self.offset_delta = delta;
260 }
261}
262
263#[derive(Default)]
264pub struct Record<B>
265where
266 B: Default,
267{
268 pub preamble: RecordHeader,
269 pub key: B,
270 pub value: B,
271 pub headers: i64,
272}
273
274impl<B> Record<B>
275where
276 B: Default,
277{
278 pub fn get_offset_delta(&self) -> Offset {
279 self.preamble.offset_delta
280 }
281
282 pub fn get_value(&self) -> &B {
283 &self.value
284 }
285
286 pub fn value(self) -> B {
287 self.value
288 }
289}
290
291impl<B> Debug for Record<B>
292where
293 B: AsyncBuffer + Debug + Default,
294{
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 writeln!(f, "{:?}", &self.preamble)?;
297 writeln!(f, "{:?}", &self.key)?;
298 writeln!(f, "{:?}", &self.value)?;
299 write!(f, "{:?}", &self.headers)
300 }
301}
302
303impl<B> From<String> for Record<B>
304where
305 B: From<String> + Default,
306{
307 fn from(value: String) -> Self {
308 let mut record = Record::default();
309 record.value = value.into();
310 record
311 }
312}
313
314impl<B> From<Vec<u8>> for Record<B>
315where
316 B: From<Vec<u8>> + Default,
317{
318 fn from(value: Vec<u8>) -> Self {
319 let mut record = Record::default();
320 record.value = value.into();
321 record
322 }
323}
324
325impl<B> Encoder for Record<B>
326where
327 B: Encoder + Default,
328{
329 fn write_size(&self, version: Version) -> usize {
330 let inner_size = self.preamble.write_size(version)
331 + self.key.write_size(version)
332 + self.value.write_size(version)
333 + self.headers.var_write_size();
334 let len: i64 = inner_size as i64;
335 len.var_write_size() + inner_size
336 }
337
338 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
339 where
340 T: BufMut,
341 {
342 let mut out: Vec<u8> = Vec::new();
343 self.preamble.encode(&mut out, version)?;
344 self.key.encode(&mut out, version)?;
345 self.value.encode(&mut out, version)?;
346 self.headers.encode_varint(&mut out)?;
347 let len: i64 = out.len() as i64;
348 trace!("record encode as {} bytes", len);
349 len.encode_varint(dest)?;
350 dest.put_slice(&out);
351 Ok(())
352 }
353}
354
355impl<B> Decoder for Record<B>
356where
357 B: Decoder,
358{
359 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
360 where
361 T: Buf,
362 {
363 trace!("decoding record");
364 let mut len: i64 = 0;
365 len.decode_varint(src)?;
366
367 trace!("record contains: {} bytes", len);
368
369 if (src.remaining() as i64) < len {
370 return Err(Error::new(
371 ErrorKind::UnexpectedEof,
372 "not enought for record",
373 ));
374 }
375 self.preamble.decode(src, version)?;
376 trace!("offset delta: {}", self.preamble.offset_delta);
377 self.key.decode(src, version)?;
378 self.value.decode(src, version)?;
379 self.headers.decode_varint(src)?;
380
381 Ok(())
382 }
383}
384
385#[cfg(test)]
386mod test {
387
388 use std::io::Cursor;
389 use std::io::Error as IoError;
390
391 use kf_protocol::Decoder;
392 use kf_protocol::Encoder;
393
394 use crate::DefaultRecord;
395
396 #[test]
397 fn test_decode_encode_record() -> Result<(), IoError> {
398 let data = [
399 0x14, 0x00, 0xea, 0x0e, 0x02, 0x01, 0x06, 0x64, 0x6f, 0x67, 0x00, ];
407
408 let record = DefaultRecord::decode_from(&mut Cursor::new(&data), 0)?;
409 assert_eq!(record.as_bytes(0)?.len(), data.len());
410
411 assert_eq!(record.write_size(0), data.len());
412 assert_eq!(record.get_offset_delta(), 1);
413 assert!(record.key.inner_value().is_none());
414 let val = record.value.inner_value();
415 assert!(val.is_some());
416 let value = val.unwrap();
417 assert_eq!(value.len(), 3);
418 assert_eq!(value[0], 0x64);
419
420 Ok(())
421 }
422
423 #[test]
425 fn test_decode_batch_truncation() {
426
427
428 use super::RecordSet;
429 use crate::DefaultBatch;
430 use crate::DefaultRecord;
431
432 fn create_batch() -> DefaultBatch {
433 let record: DefaultRecord = vec![0x74, 0x65, 0x73, 0x74].into();
434 let mut batch = DefaultBatch::default();
435 batch.records.push(record);
436 batch
437 }
438
439 let batches = RecordSet::default()
441 .add(create_batch())
442 .add(create_batch())
443 .add(create_batch());
444
445 const TRUNCATED: usize = 10;
446
447 let mut bytes = batches.as_bytes(0).expect("bytes");
448
449 let original_len = bytes.len();
450 let _ = bytes.split_off(original_len-TRUNCATED); let body = bytes.split_off(4); let new_len = (original_len - TRUNCATED - 4) as i32;
454 let mut out = vec![];
455 new_len.encode(&mut out, 0).expect("encoding");
456 out.extend_from_slice(&body);
457
458 assert_eq!(out.len(),original_len - TRUNCATED);
459
460 println!("decoding...");
461 let decoded_batches = RecordSet::decode_from(&mut Cursor::new(out), 0).expect("decoding");
462 assert_eq!(decoded_batches.batches.len(), 2);
463 }
464}