1use std::fmt;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::io::Error;
5use std::io::ErrorKind;
6use std::str::Utf8Error;
7
8use content_inspector::{inspect, ContentType};
9use tracing::{trace, warn};
10use once_cell::sync::Lazy;
11
12use bytes::Buf;
13use bytes::BufMut;
14
15use crate::batch::BatchRecords;
16use crate::batch::MemoryRecords;
17use crate::batch::NO_TIMESTAMP;
18use crate::batch::RawRecords;
19use crate::core::{Encoder, Decoder};
20use crate::core::DecoderVarInt;
21use crate::core::EncoderVarInt;
22use crate::core::Version;
23
24use crate::batch::Batch;
25use crate::Offset;
26
27use fluvio_compression::CompressionError;
28use fluvio_types::Timestamp;
29
30static MAX_STRING_DISPLAY: Lazy<usize> = Lazy::new(|| {
32 let var_value = std::env::var("FLV_MAX_STRING_DISPLAY").unwrap_or_default();
33 var_value.parse().unwrap_or(16384)
34});
35
36pub struct RecordKey(RecordKeyInner);
52
53impl RecordKey {
54 pub const NULL: Self = Self(RecordKeyInner::Null);
55
56 fn into_option(self) -> Option<RecordData> {
57 match self.0 {
58 RecordKeyInner::Key(key) => Some(key),
59 RecordKeyInner::Null => None,
60 }
61 }
62
63 #[doc(hidden)]
64 pub fn from_option(key: Option<RecordData>) -> Self {
65 let inner = match key {
66 Some(key) => RecordKeyInner::Key(key),
67 None => RecordKeyInner::Null,
68 };
69 Self(inner)
70 }
71}
72
73enum RecordKeyInner {
74 Null,
75 Key(RecordData),
76}
77
78impl<K: Into<Vec<u8>>> From<K> for RecordKey {
79 fn from(k: K) -> Self {
80 Self(RecordKeyInner::Key(RecordData::from(k)))
81 }
82}
83
84#[derive(Clone, Default, PartialEq)]
94pub struct RecordData(Bytes);
95
96impl RecordData {
97 pub fn len(&self) -> usize {
98 self.0.len()
99 }
100
101 pub fn is_binary(&self) -> bool {
103 matches!(inspect(&self.0), ContentType::BINARY)
104 }
105
106 pub fn describe(&self) -> String {
108 if self.is_binary() {
109 format!("binary: ({} bytes)", self.len())
110 } else {
111 format!("text: '{}'", self)
112 }
113 }
114
115 pub fn as_str(&self) -> Result<&str, Utf8Error> {
117 std::str::from_utf8(self.as_ref())
118 }
119}
120
121impl<V: Into<Vec<u8>>> From<V> for RecordData {
122 fn from(value: V) -> Self {
123 Self(Bytes::from(value.into()))
124 }
125}
126
127impl AsRef<[u8]> for RecordData {
128 fn as_ref(&self) -> &[u8] {
129 self.0.as_ref()
130 }
131}
132
133impl Debug for RecordData {
134 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
135 let value = &self.0;
136 if matches!(inspect(value), ContentType::BINARY) {
137 write!(f, "values binary: ({} bytes)", self.len())
138 } else if value.len() < *MAX_STRING_DISPLAY {
139 write!(f, "{}", String::from_utf8_lossy(value))
140 } else {
141 write!(
142 f,
143 "{}...",
144 String::from_utf8_lossy(&value[0..*MAX_STRING_DISPLAY])
145 )
146 }
147 }
148}
149
150impl Display for RecordData {
151 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
152 let value = &self.0;
153 if matches!(inspect(value), ContentType::BINARY) {
154 write!(f, "binary: ({} bytes)", self.len())
155 } else if value.len() < *MAX_STRING_DISPLAY {
156 write!(f, "{}", String::from_utf8_lossy(value))
157 } else {
158 write!(
159 f,
160 "{}...",
161 String::from_utf8_lossy(&value[0..*MAX_STRING_DISPLAY])
162 )
163 }
164 }
165}
166
167impl Encoder for RecordData {
168 fn write_size(&self, version: Version) -> usize {
169 let len = self.0.len() as i64;
170 self.0.iter().fold(len.var_write_size(), |sum, val| {
171 sum + val.write_size(version)
172 })
173 }
174
175 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
176 where
177 T: BufMut,
178 {
179 let len: i64 = self.0.len() as i64;
180 len.encode_varint(dest)?;
181 for v in self.0.iter() {
182 v.encode(dest, version)?;
183 }
184 Ok(())
185 }
186}
187
188impl Decoder for RecordData {
189 fn decode<T>(&mut self, src: &mut T, _: Version) -> Result<(), Error>
190 where
191 T: Buf,
192 {
193 trace!("decoding default asyncbuffer");
194
195 let mut len: i64 = 0;
196 len.decode_varint(src)?;
197 let len = len as usize;
198
199 let slice = src.take(len);
201 let mut bytes = BytesMut::with_capacity(len);
202 bytes.put(slice);
203
204 self.0 = bytes.freeze();
206 Ok(())
207 }
208}
209
210#[derive(Default, Debug)]
213pub struct RecordSet<R = MemoryRecords> {
214 pub batches: Vec<Batch<R>>,
215}
216
217impl TryFrom<RecordSet> for RecordSet<RawRecords> {
218 type Error = CompressionError;
219 fn try_from(set: RecordSet) -> Result<Self, Self::Error> {
220 let batches: Result<Vec<_>, _> = set
221 .batches
222 .into_iter()
223 .map(|batch| batch.try_into())
224 .collect();
225 Ok(Self { batches: batches? })
226 }
227}
228
229impl fmt::Display for RecordSet {
230 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
231 write!(f, "{} batches", self.batches.len())
232 }
233}
234
235impl<R: BatchRecords> RecordSet<R> {
236 pub fn add(mut self, batch: Batch<R>) -> Self {
237 self.batches.push(batch);
238 self
239 }
240
241 pub fn last_offset(&self) -> Option<Offset> {
243 self.batches
244 .last()
245 .map(|batch| batch.computed_last_offset())
246 }
247
248 pub fn total_records(&self) -> usize {
250 self.batches
251 .iter()
252 .map(|batches| batches.records_len())
253 .sum()
254 }
255
256 pub fn base_offset(&self) -> Offset {
258 self.batches
259 .first()
260 .map(|batches| batches.base_offset)
261 .unwrap_or_else(|| -1)
262 }
263}
264
265impl<R: BatchRecords> Decoder for RecordSet<R> {
266 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
267 where
268 T: Buf,
269 {
270 trace!(len = src.remaining(), "raw buffer size");
271 let mut len: i32 = 0;
272 len.decode(src, version)?;
273 trace!(len, "Record sets decoded content");
274
275 if src.remaining() < len as usize {
276 return Err(Error::new(
277 ErrorKind::UnexpectedEof,
278 format!(
279 "expected message len: {} but founded {}",
280 len,
281 src.remaining()
282 ),
283 ));
284 }
285
286 let mut buf = src.take(len as usize);
287
288 let mut count = 0;
289 while buf.remaining() > 0 {
290 trace!(count, remaining = buf.remaining(), "decoding batches");
291 let mut batch = Batch::default();
292 match batch.decode(&mut buf, version) {
293 Ok(_) => self.batches.push(batch),
294 Err(err) => match err.kind() {
295 ErrorKind::UnexpectedEof => {
296 warn!(
297 len,
298 remaining = buf.remaining(),
299 version,
300 count,
301 "not enough bytes for decoding batch from recordset"
302 );
303 return Ok(());
304 }
305 _ => {
306 warn!("problem decoding batch: {}", err);
307 return Ok(());
308 }
309 },
310 }
311 count += 1;
312 }
313
314 Ok(())
315 }
316}
317
318impl<R: BatchRecords> Encoder for RecordSet<R> {
319 fn write_size(&self, version: Version) -> usize {
320 self.batches
321 .iter()
322 .fold(4, |sum, val| sum + val.write_size(version))
323 }
324
325 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
326 where
327 T: BufMut,
328 {
329 trace!("Record set encoding");
330
331 let mut out: Vec<u8> = Vec::new();
332
333 for batch in &self.batches {
334 trace!("encoding batch..");
335 batch.encode(&mut out, version)?;
336 }
337
338 let length: i32 = out.len() as i32;
339 trace!("Record Set encode len: {}", length);
340 length.encode(dest, version)?;
341
342 dest.put_slice(&out);
343 Ok(())
344 }
345}
346
347impl<R: Clone> Clone for RecordSet<R> {
348 fn clone(&self) -> Self {
349 Self {
350 batches: self.batches.clone(),
351 }
352 }
353}
354
355#[derive(Decoder, Default, Encoder, Debug, Clone)]
356pub struct RecordHeader {
357 attributes: i8,
358 #[varint]
359 timestamp_delta: Timestamp,
360 #[varint]
361 offset_delta: Offset,
362}
363
364impl RecordHeader {
365 pub fn set_offset_delta(&mut self, delta: Offset) {
366 self.offset_delta = delta;
367 }
368
369 pub fn offset_delta(&self) -> Offset {
370 self.offset_delta
371 }
372
373 #[cfg(feature = "memory_batch")]
374 pub(crate) fn set_timestamp_delta(&mut self, delta: Timestamp) {
375 self.timestamp_delta = delta;
376 }
377}
378
379#[derive(Default, Clone)]
380pub struct Record<B = RecordData> {
381 pub preamble: RecordHeader,
382 pub key: Option<B>,
383 pub value: B,
384 pub headers: i64,
385}
386
387impl<B: Default> Record<B> {
388 pub fn get_offset_delta(&self) -> Offset {
389 self.preamble.offset_delta
390 }
391
392 pub fn add_base_offset(&mut self, relative_base_offset: Offset) {
394 self.preamble.offset_delta += relative_base_offset;
395 }
396
397 pub fn value(&self) -> &B {
399 &self.value
400 }
401
402 pub fn key(&self) -> Option<&B> {
404 self.key.as_ref()
405 }
406
407 pub fn into_value(self) -> B {
409 self.value
410 }
411
412 pub fn into_key(self) -> Option<B> {
414 self.key
415 }
416}
417
418impl Record {
419 pub fn new<V>(value: V) -> Self
420 where
421 V: Into<RecordData>,
422 {
423 Record {
424 value: value.into(),
425 ..Default::default()
426 }
427 }
428
429 pub fn new_key_value<K, V>(key: K, value: V) -> Self
430 where
431 K: Into<RecordKey>,
432 V: Into<RecordData>,
433 {
434 let key = key.into().into_option();
435 Record {
436 key,
437 value: value.into(),
438 ..Default::default()
439 }
440 }
441
442 pub(crate) fn timestamp_delta(&self) -> Timestamp {
443 self.preamble.timestamp_delta
444 }
445}
446
447impl<K, V> From<(K, V)> for Record
448where
449 K: Into<RecordKey>,
450 V: Into<RecordData>,
451{
452 fn from((key, value): (K, V)) -> Self {
453 Self::new_key_value(key, value)
454 }
455}
456
457impl<B: Debug> Debug for Record<B> {
458 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
459 f.debug_struct("Record")
460 .field("preamble", &self.preamble)
461 .field("key", &self.key)
462 .field("value", &self.value)
463 .field("headers", &self.headers)
464 .finish()
465 }
466}
467
468impl<B> Encoder for Record<B>
469where
470 B: Encoder + Default,
471{
472 fn write_size(&self, version: Version) -> usize {
473 let inner_size = self.preamble.write_size(version)
474 + self.key.write_size(version)
475 + self.value.write_size(version)
476 + self.headers.var_write_size();
477 let len: i64 = inner_size as i64;
478 len.var_write_size() + inner_size
479 }
480
481 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
482 where
483 T: BufMut,
484 {
485 let mut out: Vec<u8> = Vec::new();
486 self.preamble.encode(&mut out, version)?;
487 self.key.encode(&mut out, version)?;
488 self.value.encode(&mut out, version)?;
489 self.headers.encode_varint(&mut out)?;
490 let len: i64 = out.len() as i64;
491 trace!("record encode as {} bytes", len);
492 len.encode_varint(dest)?;
493 dest.put_slice(&out);
494 Ok(())
495 }
496}
497
498impl<B> Decoder for Record<B>
499where
500 B: Decoder,
501{
502 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
503 where
504 T: Buf,
505 {
506 trace!("decoding record");
507 let mut len: i64 = 0;
508 len.decode_varint(src)?;
509
510 trace!("record contains: {} bytes", len);
511
512 if (src.remaining() as i64) < len {
513 return Err(Error::new(
514 ErrorKind::UnexpectedEof,
515 "not enough for record",
516 ));
517 }
518 self.preamble.decode(src, version)?;
519 trace!("offset delta: {}", self.preamble.offset_delta);
520 self.key.decode(src, version)?;
521 self.value.decode(src, version)?;
522 self.headers.decode_varint(src)?;
523
524 Ok(())
525 }
526}
527
528use Record as DefaultRecord;
529
530pub struct ConsumerRecord<B = DefaultRecord> {
532 pub offset: i64,
534 pub partition: i32,
536 pub record: B,
538 pub(crate) timestamp_base: Timestamp,
540}
541
542impl<B> ConsumerRecord<B> {
543 pub fn offset(&self) -> i64 {
545 self.offset
546 }
547
548 pub fn partition(&self) -> i32 {
550 self.partition
551 }
552
553 pub fn into_inner(self) -> B {
555 self.record
556 }
557
558 pub fn inner(&self) -> &B {
560 &self.record
561 }
562}
563
564impl ConsumerRecord<DefaultRecord> {
565 pub fn key(&self) -> Option<&[u8]> {
567 self.record.key().map(|it| it.as_ref())
568 }
569
570 pub fn value(&self) -> &[u8] {
572 self.record.value().as_ref()
573 }
574 pub fn timestamp(&self) -> Timestamp {
576 if self.timestamp_base <= 0 {
577 NO_TIMESTAMP
578 } else {
579 self.timestamp_base + self.record.timestamp_delta()
580 }
581 }
582}
583
584impl AsRef<[u8]> for ConsumerRecord<DefaultRecord> {
585 fn as_ref(&self) -> &[u8] {
586 self.value()
587 }
588}
589
590#[cfg(test)]
591mod test {
592 use super::*;
593 use std::io::Cursor;
594 use std::io::Error as IoError;
595
596 use crate::core::Decoder;
597 use crate::core::Encoder;
598 use crate::record::Record;
599
600 #[test]
601 fn test_decode_encode_record() -> Result<(), IoError> {
602 let data = [0x12, 0x0, 0x0, 0x2, 0x0, 0x6, 0x64, 0x6f, 0x67, 0x0];
610
611 let record = Record::<RecordData>::decode_from(&mut Cursor::new(&data), 0)?;
612 assert_eq!(record.as_bytes(0)?.len(), data.len());
613
614 assert_eq!(record.write_size(0), data.len());
615 println!("offset_delta: {:?}", record.get_offset_delta());
616 assert_eq!(record.get_offset_delta(), 1);
617
618 let value = record.value.as_ref();
619 assert_eq!(value.len(), 3);
620 assert_eq!(value[0], 0x64);
621
622 Ok(())
623 }
624
625 #[test]
627 fn test_decode_batch_truncation() {
628 use super::RecordSet;
629 use crate::batch::Batch;
630 use crate::record::Record;
631
632 fn create_batch() -> Batch {
633 let value = vec![0x74, 0x65, 0x73, 0x74];
634 let record = Record::new(value);
635 let mut batch = Batch::default();
636 batch.add_record(record);
637 batch
638 }
639
640 let batches = RecordSet::default()
642 .add(create_batch())
643 .add(create_batch())
644 .add(create_batch());
645
646 const TRUNCATED: usize = 10;
647
648 let mut bytes = batches.as_bytes(0).expect("bytes");
649
650 let original_len = bytes.len();
651 let _ = bytes.split_off(original_len - TRUNCATED); let body = bytes.split_off(4); let new_len = (original_len - TRUNCATED - 4) as i32;
655 let mut out = vec![];
656 new_len.encode(&mut out, 0).expect("encoding");
657 out.extend_from_slice(&body);
658
659 assert_eq!(out.len(), original_len - TRUNCATED);
660
661 println!("decoding...");
662 let decoded_batches =
663 RecordSet::<MemoryRecords>::decode_from(&mut Cursor::new(out), 0).expect("decoding");
664 assert_eq!(decoded_batches.batches.len(), 2);
665 }
666
667 #[test]
668 fn test_key_value_encoding() {
669 let key = "KKKKKKKKKK".to_string();
670 let value = "VVVVVVVVVV".to_string();
671 let record = Record::new_key_value(key, value);
672
673 let mut encoded = Vec::new();
674 record.encode(&mut encoded, 0).unwrap();
675 let decoded = Record::<RecordData>::decode_from(&mut Cursor::new(encoded), 0).unwrap();
676
677 let record_key = record.key.unwrap();
678 let decoded_key = decoded.key.unwrap();
679 assert_eq!(record_key.as_ref(), decoded_key.as_ref());
680 assert_eq!(record.value.as_ref(), decoded.value.as_ref());
681 }
682
683 #[test]
713 fn test_decode_old_record_empty_key() {
714 let old_encoded = std::fs::read("./tests/test_old_record_empty_key.bin").unwrap();
715 let decoded = Record::<RecordData>::decode_from(&mut Cursor::new(old_encoded), 0).unwrap();
716 assert_eq!(
717 std::str::from_utf8(decoded.value.0.as_ref()).unwrap(),
718 "VVVVVVVVVV"
719 );
720 assert!(decoded.key.is_none());
721 }
722
723 #[test]
724 fn test_consumer_record_no_timestamp() {
725 let record = ConsumerRecord::<Record<RecordData>> {
726 timestamp_base: NO_TIMESTAMP,
727 offset: 0,
728 partition: 0,
729 record: Default::default(),
730 };
731
732 assert_eq!(record.timestamp(), NO_TIMESTAMP);
733 let record = ConsumerRecord::<Record<RecordData>> {
734 timestamp_base: 0,
735 offset: 0,
736 partition: 0,
737 record: Default::default(),
738 };
739 assert_eq!(record.timestamp(), NO_TIMESTAMP);
740 }
741
742 #[test]
743 fn test_consumer_record_timestamp() {
744 let record = ConsumerRecord::<Record<RecordData>> {
745 timestamp_base: 1_000_000_000,
746 offset: 0,
747 partition: 0,
748 record: Default::default(),
749 };
750
751 assert_eq!(record.timestamp(), 1_000_000_000);
752 let mut memory_record = Record::<RecordData>::default();
753 memory_record.preamble.timestamp_delta = 800;
754 let record = ConsumerRecord::<Record<RecordData>> {
755 timestamp_base: 1_000_000_000,
756 record: memory_record,
757 offset: 0,
758 partition: 0,
759 };
760 assert_eq!(record.timestamp(), 1_000_000_800);
761 }
762}
763
764#[cfg(feature = "file")]
765pub use file::*;
766use crate::bytes::{Bytes, BytesMut};
767
768#[cfg(feature = "file")]
769mod file {
770
771 use std::fmt;
772 use std::io::Error as IoError;
773 use std::io::ErrorKind;
774
775 use tracing::trace;
776 use bytes::BufMut;
777 use bytes::BytesMut;
778
779 use fluvio_future::file_slice::AsyncFileSlice;
780 use crate::core::bytes::Buf;
781 use crate::core::Decoder;
782 use crate::core::Encoder;
783 use crate::core::Version;
784 use crate::store::FileWrite;
785 use crate::store::StoreValue;
786
787 #[derive(Default, Debug)]
788 pub struct FileRecordSet(AsyncFileSlice);
789
790 impl fmt::Display for FileRecordSet {
791 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
792 write!(f, "pos: {} len: {}", self.position(), self.len())
793 }
794 }
795
796 impl FileRecordSet {
797 pub fn position(&self) -> u64 {
798 self.0.position()
799 }
800
801 pub fn len(&self) -> usize {
802 self.0.len() as usize
803 }
804
805 pub fn raw_slice(&self) -> AsyncFileSlice {
806 self.0.clone()
807 }
808 }
809
810 impl From<AsyncFileSlice> for FileRecordSet {
811 fn from(slice: AsyncFileSlice) -> Self {
812 Self(slice)
813 }
814 }
815
816 impl Encoder for FileRecordSet {
817 fn write_size(&self, _version: Version) -> usize {
818 self.len() + 4 }
820
821 fn encode<T>(&self, src: &mut T, version: Version) -> Result<(), IoError>
822 where
823 T: BufMut,
824 {
825 if self.len() == 0 {
827 let len: u32 = 0;
828 len.encode(src, version)
829 } else {
830 Err(IoError::new(
831 ErrorKind::InvalidInput,
832 format!("len {} is not zeo", self.len()),
833 ))
834 }
835 }
836 }
837
838 impl Decoder for FileRecordSet {
839 fn decode<T>(&mut self, _src: &mut T, _version: Version) -> Result<(), IoError>
840 where
841 T: Buf,
842 {
843 unimplemented!("file slice cannot be decoded in the ButMut")
844 }
845 }
846
847 impl FileWrite for FileRecordSet {
848 fn file_encode(
849 &self,
850 dest: &mut BytesMut,
851 data: &mut Vec<StoreValue>,
852 version: Version,
853 ) -> Result<(), IoError> {
854 let len: i32 = self.len() as i32;
856 trace!("KfFileRecordSet encoding file slice len: {}", len);
857 len.encode(dest, version)?;
858 let bytes = dest.split_to(dest.len()).freeze();
859 data.push(StoreValue::Bytes(bytes));
860 data.push(StoreValue::FileSlice(self.raw_slice()));
861 Ok(())
862 }
863 }
864}