fluvio_dataplane_protocol/
record.rs

1use std::fmt;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::io::Error;
5use std::io::ErrorKind;
6use std::str::Utf8Error;
7
8use content_inspector::{inspect, ContentType};
9use tracing::{trace, warn};
10use once_cell::sync::Lazy;
11
12use bytes::Buf;
13use bytes::BufMut;
14
15use crate::batch::BatchRecords;
16use crate::batch::MemoryRecords;
17use crate::batch::NO_TIMESTAMP;
18use crate::batch::RawRecords;
19use crate::core::{Encoder, Decoder};
20use crate::core::DecoderVarInt;
21use crate::core::EncoderVarInt;
22use crate::core::Version;
23
24use crate::batch::Batch;
25use crate::Offset;
26
27use fluvio_compression::CompressionError;
28use fluvio_types::Timestamp;
29
30/// maximum text to display
31static MAX_STRING_DISPLAY: Lazy<usize> = Lazy::new(|| {
32    let var_value = std::env::var("FLV_MAX_STRING_DISPLAY").unwrap_or_default();
33    var_value.parse().unwrap_or(16384)
34});
35
36/// A key for determining which partition a record should be sent to.
37///
38/// This type is used to support conversions from any other type that
39/// may be converted to a `Vec<u8>`, while still allowing the ability
40/// to explicitly state that a record may have no key (`RecordKey::NULL`).
41///
42/// # Examples
43///
44/// ```
45/// # use fluvio_dataplane_protocol::record::RecordKey;
46/// let key = RecordKey::NULL;
47/// let key: RecordKey = "Hello, world!".into();
48/// let key: RecordKey = String::from("Hello, world!").into();
49/// let key: RecordKey = vec![1, 2, 3, 4].into();
50/// ```
51pub struct RecordKey(RecordKeyInner);
52
53impl RecordKey {
54    pub const NULL: Self = Self(RecordKeyInner::Null);
55
56    fn into_option(self) -> Option<RecordData> {
57        match self.0 {
58            RecordKeyInner::Key(key) => Some(key),
59            RecordKeyInner::Null => None,
60        }
61    }
62
63    #[doc(hidden)]
64    pub fn from_option(key: Option<RecordData>) -> Self {
65        let inner = match key {
66            Some(key) => RecordKeyInner::Key(key),
67            None => RecordKeyInner::Null,
68        };
69        Self(inner)
70    }
71}
72
73enum RecordKeyInner {
74    Null,
75    Key(RecordData),
76}
77
78impl<K: Into<Vec<u8>>> From<K> for RecordKey {
79    fn from(k: K) -> Self {
80        Self(RecordKeyInner::Key(RecordData::from(k)))
81    }
82}
83
84/// A type containing the data contents of a Record.
85///
86/// The `RecordData` type provides useful conversions for
87/// constructing it from any type that may convert into a `Vec<u8>`.
88/// This is the basis for flexible APIs that allow users to supply
89/// various different argument types as record contents. See
90/// [the Producer API] as an example.
91///
92/// [the Producer API]: https://docs.rs/fluvio/producer/TopicProducer::send
93#[derive(Clone, Default, PartialEq)]
94pub struct RecordData(Bytes);
95
96impl RecordData {
97    pub fn len(&self) -> usize {
98        self.0.len()
99    }
100
101    /// Check if value is binary content
102    pub fn is_binary(&self) -> bool {
103        matches!(inspect(&self.0), ContentType::BINARY)
104    }
105
106    /// Describe value - return text, binary, or 0 bytes
107    pub fn describe(&self) -> String {
108        if self.is_binary() {
109            format!("binary: ({} bytes)", self.len())
110        } else {
111            format!("text: '{}'", self)
112        }
113    }
114
115    // as string slice
116    pub fn as_str(&self) -> Result<&str, Utf8Error> {
117        std::str::from_utf8(self.as_ref())
118    }
119}
120
121impl<V: Into<Vec<u8>>> From<V> for RecordData {
122    fn from(value: V) -> Self {
123        Self(Bytes::from(value.into()))
124    }
125}
126
127impl AsRef<[u8]> for RecordData {
128    fn as_ref(&self) -> &[u8] {
129        self.0.as_ref()
130    }
131}
132
133impl Debug for RecordData {
134    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
135        let value = &self.0;
136        if matches!(inspect(value), ContentType::BINARY) {
137            write!(f, "values binary: ({} bytes)", self.len())
138        } else if value.len() < *MAX_STRING_DISPLAY {
139            write!(f, "{}", String::from_utf8_lossy(value))
140        } else {
141            write!(
142                f,
143                "{}...",
144                String::from_utf8_lossy(&value[0..*MAX_STRING_DISPLAY])
145            )
146        }
147    }
148}
149
150impl Display for RecordData {
151    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
152        let value = &self.0;
153        if matches!(inspect(value), ContentType::BINARY) {
154            write!(f, "binary: ({} bytes)", self.len())
155        } else if value.len() < *MAX_STRING_DISPLAY {
156            write!(f, "{}", String::from_utf8_lossy(value))
157        } else {
158            write!(
159                f,
160                "{}...",
161                String::from_utf8_lossy(&value[0..*MAX_STRING_DISPLAY])
162            )
163        }
164    }
165}
166
167impl Encoder for RecordData {
168    fn write_size(&self, version: Version) -> usize {
169        let len = self.0.len() as i64;
170        self.0.iter().fold(len.var_write_size(), |sum, val| {
171            sum + val.write_size(version)
172        })
173    }
174
175    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
176    where
177        T: BufMut,
178    {
179        let len: i64 = self.0.len() as i64;
180        len.encode_varint(dest)?;
181        for v in self.0.iter() {
182            v.encode(dest, version)?;
183        }
184        Ok(())
185    }
186}
187
188impl Decoder for RecordData {
189    fn decode<T>(&mut self, src: &mut T, _: Version) -> Result<(), Error>
190    where
191        T: Buf,
192    {
193        trace!("decoding default asyncbuffer");
194
195        let mut len: i64 = 0;
196        len.decode_varint(src)?;
197        let len = len as usize;
198
199        // Take `len` bytes from `src` and put them into a new BytesMut buffer
200        let slice = src.take(len);
201        let mut bytes = BytesMut::with_capacity(len);
202        bytes.put(slice);
203
204        // Replace the inner Bytes buffer of this RecordData
205        self.0 = bytes.freeze();
206        Ok(())
207    }
208}
209
210/// Represents sets of batches in storage
211//  It is written consequently with len as prefix
212#[derive(Default, Debug)]
213pub struct RecordSet<R = MemoryRecords> {
214    pub batches: Vec<Batch<R>>,
215}
216
217impl TryFrom<RecordSet> for RecordSet<RawRecords> {
218    type Error = CompressionError;
219    fn try_from(set: RecordSet) -> Result<Self, Self::Error> {
220        let batches: Result<Vec<_>, _> = set
221            .batches
222            .into_iter()
223            .map(|batch| batch.try_into())
224            .collect();
225        Ok(Self { batches: batches? })
226    }
227}
228
229impl fmt::Display for RecordSet {
230    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
231        write!(f, "{} batches", self.batches.len())
232    }
233}
234
235impl<R: BatchRecords> RecordSet<R> {
236    pub fn add(mut self, batch: Batch<R>) -> Self {
237        self.batches.push(batch);
238        self
239    }
240
241    /// this is next offset to be fetched
242    pub fn last_offset(&self) -> Option<Offset> {
243        self.batches
244            .last()
245            .map(|batch| batch.computed_last_offset())
246    }
247
248    /// total records
249    pub fn total_records(&self) -> usize {
250        self.batches
251            .iter()
252            .map(|batches| batches.records_len())
253            .sum()
254    }
255
256    /// return base offset
257    pub fn base_offset(&self) -> Offset {
258        self.batches
259            .first()
260            .map(|batches| batches.base_offset)
261            .unwrap_or_else(|| -1)
262    }
263}
264
265impl<R: BatchRecords> Decoder for RecordSet<R> {
266    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
267    where
268        T: Buf,
269    {
270        trace!(len = src.remaining(), "raw buffer size");
271        let mut len: i32 = 0;
272        len.decode(src, version)?;
273        trace!(len, "Record sets decoded content");
274
275        if src.remaining() < len as usize {
276            return Err(Error::new(
277                ErrorKind::UnexpectedEof,
278                format!(
279                    "expected message len: {} but founded {}",
280                    len,
281                    src.remaining()
282                ),
283            ));
284        }
285
286        let mut buf = src.take(len as usize);
287
288        let mut count = 0;
289        while buf.remaining() > 0 {
290            trace!(count, remaining = buf.remaining(), "decoding batches");
291            let mut batch = Batch::default();
292            match batch.decode(&mut buf, version) {
293                Ok(_) => self.batches.push(batch),
294                Err(err) => match err.kind() {
295                    ErrorKind::UnexpectedEof => {
296                        warn!(
297                            len,
298                            remaining = buf.remaining(),
299                            version,
300                            count,
301                            "not enough bytes for decoding batch from recordset"
302                        );
303                        return Ok(());
304                    }
305                    _ => {
306                        warn!("problem decoding batch: {}", err);
307                        return Ok(());
308                    }
309                },
310            }
311            count += 1;
312        }
313
314        Ok(())
315    }
316}
317
318impl<R: BatchRecords> Encoder for RecordSet<R> {
319    fn write_size(&self, version: Version) -> usize {
320        self.batches
321            .iter()
322            .fold(4, |sum, val| sum + val.write_size(version))
323    }
324
325    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
326    where
327        T: BufMut,
328    {
329        trace!("Record set encoding");
330
331        let mut out: Vec<u8> = Vec::new();
332
333        for batch in &self.batches {
334            trace!("encoding batch..");
335            batch.encode(&mut out, version)?;
336        }
337
338        let length: i32 = out.len() as i32;
339        trace!("Record Set encode len: {}", length);
340        length.encode(dest, version)?;
341
342        dest.put_slice(&out);
343        Ok(())
344    }
345}
346
347impl<R: Clone> Clone for RecordSet<R> {
348    fn clone(&self) -> Self {
349        Self {
350            batches: self.batches.clone(),
351        }
352    }
353}
354
355#[derive(Decoder, Default, Encoder, Debug, Clone)]
356pub struct RecordHeader {
357    attributes: i8,
358    #[varint]
359    timestamp_delta: Timestamp,
360    #[varint]
361    offset_delta: Offset,
362}
363
364impl RecordHeader {
365    pub fn set_offset_delta(&mut self, delta: Offset) {
366        self.offset_delta = delta;
367    }
368
369    pub fn offset_delta(&self) -> Offset {
370        self.offset_delta
371    }
372
373    #[cfg(feature = "memory_batch")]
374    pub(crate) fn set_timestamp_delta(&mut self, delta: Timestamp) {
375        self.timestamp_delta = delta;
376    }
377}
378
379#[derive(Default, Clone)]
380pub struct Record<B = RecordData> {
381    pub preamble: RecordHeader,
382    pub key: Option<B>,
383    pub value: B,
384    pub headers: i64,
385}
386
387impl<B: Default> Record<B> {
388    pub fn get_offset_delta(&self) -> Offset {
389        self.preamble.offset_delta
390    }
391
392    /// add offset delta with new relative base offset
393    pub fn add_base_offset(&mut self, relative_base_offset: Offset) {
394        self.preamble.offset_delta += relative_base_offset;
395    }
396
397    /// Returns a reference to the inner value
398    pub fn value(&self) -> &B {
399        &self.value
400    }
401
402    /// Returns a reference to the inner key if it exists
403    pub fn key(&self) -> Option<&B> {
404        self.key.as_ref()
405    }
406
407    /// Consumes this record, returning the inner value
408    pub fn into_value(self) -> B {
409        self.value
410    }
411
412    /// Consumes this record, returning the inner key if it exists
413    pub fn into_key(self) -> Option<B> {
414        self.key
415    }
416}
417
418impl Record {
419    pub fn new<V>(value: V) -> Self
420    where
421        V: Into<RecordData>,
422    {
423        Record {
424            value: value.into(),
425            ..Default::default()
426        }
427    }
428
429    pub fn new_key_value<K, V>(key: K, value: V) -> Self
430    where
431        K: Into<RecordKey>,
432        V: Into<RecordData>,
433    {
434        let key = key.into().into_option();
435        Record {
436            key,
437            value: value.into(),
438            ..Default::default()
439        }
440    }
441
442    pub(crate) fn timestamp_delta(&self) -> Timestamp {
443        self.preamble.timestamp_delta
444    }
445}
446
447impl<K, V> From<(K, V)> for Record
448where
449    K: Into<RecordKey>,
450    V: Into<RecordData>,
451{
452    fn from((key, value): (K, V)) -> Self {
453        Self::new_key_value(key, value)
454    }
455}
456
457impl<B: Debug> Debug for Record<B> {
458    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
459        f.debug_struct("Record")
460            .field("preamble", &self.preamble)
461            .field("key", &self.key)
462            .field("value", &self.value)
463            .field("headers", &self.headers)
464            .finish()
465    }
466}
467
468impl<B> Encoder for Record<B>
469where
470    B: Encoder + Default,
471{
472    fn write_size(&self, version: Version) -> usize {
473        let inner_size = self.preamble.write_size(version)
474            + self.key.write_size(version)
475            + self.value.write_size(version)
476            + self.headers.var_write_size();
477        let len: i64 = inner_size as i64;
478        len.var_write_size() + inner_size
479    }
480
481    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
482    where
483        T: BufMut,
484    {
485        let mut out: Vec<u8> = Vec::new();
486        self.preamble.encode(&mut out, version)?;
487        self.key.encode(&mut out, version)?;
488        self.value.encode(&mut out, version)?;
489        self.headers.encode_varint(&mut out)?;
490        let len: i64 = out.len() as i64;
491        trace!("record encode as {} bytes", len);
492        len.encode_varint(dest)?;
493        dest.put_slice(&out);
494        Ok(())
495    }
496}
497
498impl<B> Decoder for Record<B>
499where
500    B: Decoder,
501{
502    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
503    where
504        T: Buf,
505    {
506        trace!("decoding record");
507        let mut len: i64 = 0;
508        len.decode_varint(src)?;
509
510        trace!("record contains: {} bytes", len);
511
512        if (src.remaining() as i64) < len {
513            return Err(Error::new(
514                ErrorKind::UnexpectedEof,
515                "not enough for record",
516            ));
517        }
518        self.preamble.decode(src, version)?;
519        trace!("offset delta: {}", self.preamble.offset_delta);
520        self.key.decode(src, version)?;
521        self.value.decode(src, version)?;
522        self.headers.decode_varint(src)?;
523
524        Ok(())
525    }
526}
527
528use Record as DefaultRecord;
529
530/// Record that can be used by Consumer which needs access to metadata
531pub struct ConsumerRecord<B = DefaultRecord> {
532    /// The offset of this Record into its partition
533    pub offset: i64,
534    /// The partition where this Record is stored
535    pub partition: i32,
536    /// The Record contents
537    pub record: B,
538    /// Timestamp base of batch in which the records is present
539    pub(crate) timestamp_base: Timestamp,
540}
541
542impl<B> ConsumerRecord<B> {
543    /// The offset from the initial offset for a given stream.
544    pub fn offset(&self) -> i64 {
545        self.offset
546    }
547
548    /// The partition where this Record is stored.
549    pub fn partition(&self) -> i32 {
550        self.partition
551    }
552
553    /// Returns the inner representation of the Record
554    pub fn into_inner(self) -> B {
555        self.record
556    }
557
558    /// Returns a ref to the inner representation of the Record
559    pub fn inner(&self) -> &B {
560        &self.record
561    }
562}
563
564impl ConsumerRecord<DefaultRecord> {
565    /// Returns the contents of this Record's key, if it exists
566    pub fn key(&self) -> Option<&[u8]> {
567        self.record.key().map(|it| it.as_ref())
568    }
569
570    /// Returns the contents of this Record's value
571    pub fn value(&self) -> &[u8] {
572        self.record.value().as_ref()
573    }
574    /// Return the timestamp of the Record
575    pub fn timestamp(&self) -> Timestamp {
576        if self.timestamp_base <= 0 {
577            NO_TIMESTAMP
578        } else {
579            self.timestamp_base + self.record.timestamp_delta()
580        }
581    }
582}
583
584impl AsRef<[u8]> for ConsumerRecord<DefaultRecord> {
585    fn as_ref(&self) -> &[u8] {
586        self.value()
587    }
588}
589
590#[cfg(test)]
591mod test {
592    use super::*;
593    use std::io::Cursor;
594    use std::io::Error as IoError;
595
596    use crate::core::Decoder;
597    use crate::core::Encoder;
598    use crate::record::Record;
599
600    #[test]
601    fn test_decode_encode_record() -> Result<(), IoError> {
602        /* Below is how you generate the vec<u8> for the `data` variable below.
603        let mut record = Record::from(String::from("dog"));
604        record.preamble.set_offset_delta(1);
605        let mut out = vec![];
606        record.encode(&mut out, 0)?;
607        println!("ENCODED: {:#x?}", out);
608        */
609        let data = [0x12, 0x0, 0x0, 0x2, 0x0, 0x6, 0x64, 0x6f, 0x67, 0x0];
610
611        let record = Record::<RecordData>::decode_from(&mut Cursor::new(&data), 0)?;
612        assert_eq!(record.as_bytes(0)?.len(), data.len());
613
614        assert_eq!(record.write_size(0), data.len());
615        println!("offset_delta: {:?}", record.get_offset_delta());
616        assert_eq!(record.get_offset_delta(), 1);
617
618        let value = record.value.as_ref();
619        assert_eq!(value.len(), 3);
620        assert_eq!(value[0], 0x64);
621
622        Ok(())
623    }
624
625    /// test decoding of records when one of the batch was truncated
626    #[test]
627    fn test_decode_batch_truncation() {
628        use super::RecordSet;
629        use crate::batch::Batch;
630        use crate::record::Record;
631
632        fn create_batch() -> Batch {
633            let value = vec![0x74, 0x65, 0x73, 0x74];
634            let record = Record::new(value);
635            let mut batch = Batch::default();
636            batch.add_record(record);
637            batch
638        }
639
640        // add 3 batches
641        let batches = RecordSet::default()
642            .add(create_batch())
643            .add(create_batch())
644            .add(create_batch());
645
646        const TRUNCATED: usize = 10;
647
648        let mut bytes = batches.as_bytes(0).expect("bytes");
649
650        let original_len = bytes.len();
651        let _ = bytes.split_off(original_len - TRUNCATED); // truncate record sets
652        let body = bytes.split_off(4); // split off body so we can manipulate len
653
654        let new_len = (original_len - TRUNCATED - 4) as i32;
655        let mut out = vec![];
656        new_len.encode(&mut out, 0).expect("encoding");
657        out.extend_from_slice(&body);
658
659        assert_eq!(out.len(), original_len - TRUNCATED);
660
661        println!("decoding...");
662        let decoded_batches =
663            RecordSet::<MemoryRecords>::decode_from(&mut Cursor::new(out), 0).expect("decoding");
664        assert_eq!(decoded_batches.batches.len(), 2);
665    }
666
667    #[test]
668    fn test_key_value_encoding() {
669        let key = "KKKKKKKKKK".to_string();
670        let value = "VVVVVVVVVV".to_string();
671        let record = Record::new_key_value(key, value);
672
673        let mut encoded = Vec::new();
674        record.encode(&mut encoded, 0).unwrap();
675        let decoded = Record::<RecordData>::decode_from(&mut Cursor::new(encoded), 0).unwrap();
676
677        let record_key = record.key.unwrap();
678        let decoded_key = decoded.key.unwrap();
679        assert_eq!(record_key.as_ref(), decoded_key.as_ref());
680        assert_eq!(record.value.as_ref(), decoded.value.as_ref());
681    }
682
683    // Test Specification:
684    //
685    // A record was encoded and written to a file, using the following code:
686    //
687    // ```rust
688    // use fluvio_dataplane_protocol::record::{Record, DefaultAsyncBuffer};
689    // use fluvio_protocol::Encoder;
690    // let value = "VVVVVVVVVV".to_string();
691    // let record = Record {
692    //     key: DefaultAsyncBuffer::default(),
693    //     value: DefaultAsyncBuffer::new(value.into_bytes()),
694    //     ..Default::default()
695    // };
696    // let mut encoded = Vec::new();
697    // record.encode(&mut encoded, 0);
698    // ```
699    //
700    // This was back when records defined keys as `key: B` rather than `key: Option<B>`.
701    //
702    // It just so happens that our public API never allowed any records to be sent with
703    // any contents in the `key` field, so what was sent over the wire was a buffer whose
704    // length was zero, encoded as a single zero `0x00` (for "length-zero buffer").
705    //
706    // In the new `key: Option<B>` encoding, a key is encoded with a tag for
707    // Some or None, with 0x00 representing None and 0x01 representing Some.
708    // So, when reading old records, the 0x00 "length encoding" will be re-interpreted
709    // as the 0x00 "None encoding". Since all old keys were empty, this should work for
710    // all old records _in practice_. This will not work if any record is decoded which
711    // artificially was given contents in the key field.
712    #[test]
713    fn test_decode_old_record_empty_key() {
714        let old_encoded = std::fs::read("./tests/test_old_record_empty_key.bin").unwrap();
715        let decoded = Record::<RecordData>::decode_from(&mut Cursor::new(old_encoded), 0).unwrap();
716        assert_eq!(
717            std::str::from_utf8(decoded.value.0.as_ref()).unwrap(),
718            "VVVVVVVVVV"
719        );
720        assert!(decoded.key.is_none());
721    }
722
723    #[test]
724    fn test_consumer_record_no_timestamp() {
725        let record = ConsumerRecord::<Record<RecordData>> {
726            timestamp_base: NO_TIMESTAMP,
727            offset: 0,
728            partition: 0,
729            record: Default::default(),
730        };
731
732        assert_eq!(record.timestamp(), NO_TIMESTAMP);
733        let record = ConsumerRecord::<Record<RecordData>> {
734            timestamp_base: 0,
735            offset: 0,
736            partition: 0,
737            record: Default::default(),
738        };
739        assert_eq!(record.timestamp(), NO_TIMESTAMP);
740    }
741
742    #[test]
743    fn test_consumer_record_timestamp() {
744        let record = ConsumerRecord::<Record<RecordData>> {
745            timestamp_base: 1_000_000_000,
746            offset: 0,
747            partition: 0,
748            record: Default::default(),
749        };
750
751        assert_eq!(record.timestamp(), 1_000_000_000);
752        let mut memory_record = Record::<RecordData>::default();
753        memory_record.preamble.timestamp_delta = 800;
754        let record = ConsumerRecord::<Record<RecordData>> {
755            timestamp_base: 1_000_000_000,
756            record: memory_record,
757            offset: 0,
758            partition: 0,
759        };
760        assert_eq!(record.timestamp(), 1_000_000_800);
761    }
762}
763
764#[cfg(feature = "file")]
765pub use file::*;
766use crate::bytes::{Bytes, BytesMut};
767
768#[cfg(feature = "file")]
769mod file {
770
771    use std::fmt;
772    use std::io::Error as IoError;
773    use std::io::ErrorKind;
774
775    use tracing::trace;
776    use bytes::BufMut;
777    use bytes::BytesMut;
778
779    use fluvio_future::file_slice::AsyncFileSlice;
780    use crate::core::bytes::Buf;
781    use crate::core::Decoder;
782    use crate::core::Encoder;
783    use crate::core::Version;
784    use crate::store::FileWrite;
785    use crate::store::StoreValue;
786
787    #[derive(Default, Debug)]
788    pub struct FileRecordSet(AsyncFileSlice);
789
790    impl fmt::Display for FileRecordSet {
791        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
792            write!(f, "pos: {} len: {}", self.position(), self.len())
793        }
794    }
795
796    impl FileRecordSet {
797        pub fn position(&self) -> u64 {
798            self.0.position()
799        }
800
801        pub fn len(&self) -> usize {
802            self.0.len() as usize
803        }
804
805        pub fn raw_slice(&self) -> AsyncFileSlice {
806            self.0.clone()
807        }
808    }
809
810    impl From<AsyncFileSlice> for FileRecordSet {
811        fn from(slice: AsyncFileSlice) -> Self {
812            Self(slice)
813        }
814    }
815
816    impl Encoder for FileRecordSet {
817        fn write_size(&self, _version: Version) -> usize {
818            self.len() + 4 // include header
819        }
820
821        fn encode<T>(&self, src: &mut T, version: Version) -> Result<(), IoError>
822        where
823            T: BufMut,
824        {
825            // can only encode zero length
826            if self.len() == 0 {
827                let len: u32 = 0;
828                len.encode(src, version)
829            } else {
830                Err(IoError::new(
831                    ErrorKind::InvalidInput,
832                    format!("len {} is not zeo", self.len()),
833                ))
834            }
835        }
836    }
837
838    impl Decoder for FileRecordSet {
839        fn decode<T>(&mut self, _src: &mut T, _version: Version) -> Result<(), IoError>
840        where
841            T: Buf,
842        {
843            unimplemented!("file slice cannot be decoded in the ButMut")
844        }
845    }
846
847    impl FileWrite for FileRecordSet {
848        fn file_encode(
849            &self,
850            dest: &mut BytesMut,
851            data: &mut Vec<StoreValue>,
852            version: Version,
853        ) -> Result<(), IoError> {
854            // write total len
855            let len: i32 = self.len() as i32;
856            trace!("KfFileRecordSet encoding file slice len: {}", len);
857            len.encode(dest, version)?;
858            let bytes = dest.split_to(dest.len()).freeze();
859            data.push(StoreValue::Bytes(bytes));
860            data.push(StoreValue::FileSlice(self.raw_slice()));
861            Ok(())
862        }
863    }
864}