Skip to main content

s2_common/record/
mod.rs

1mod batcher;
2mod command;
3mod encryption;
4mod envelope;
5mod fencing;
6mod iterator;
7mod metering;
8
9pub use batcher::{RecordBatch, RecordBatcher};
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11pub use command::CommandRecord;
12use command::{CommandOp, CommandPayloadError};
13pub use encryption::{
14    EncryptedRecord, RecordDecryptionError, decrypt_stored_record, encrypt_record,
15};
16use enum_ordinalize::Ordinalize;
17pub use envelope::EnvelopeRecord;
18use envelope::HeaderValidationError;
19pub use fencing::{FencingToken, FencingTokenTooLongError, MAX_FENCING_TOKEN_LENGTH};
20pub use iterator::StoredRecordIterator;
21pub use metering::{Metered, MeteredExt, MeteredSize};
22
23use crate::deep_size::DeepSize;
24
25pub type SeqNum = u64;
26pub type NonZeroSeqNum = std::num::NonZeroU64;
27pub type Timestamp = u64;
28
29#[derive(Debug, PartialEq, Eq, Clone, Copy)]
30pub struct StreamPosition {
31    pub seq_num: SeqNum,
32    pub timestamp: Timestamp,
33}
34
35impl StreamPosition {
36    pub const MIN: StreamPosition = StreamPosition {
37        seq_num: SeqNum::MIN,
38        timestamp: Timestamp::MIN,
39    };
40}
41
42impl std::fmt::Display for StreamPosition {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "{} @ {}", self.seq_num, self.timestamp)
45    }
46}
47
48impl DeepSize for StreamPosition {
49    fn deep_size(&self) -> usize {
50        self.seq_num.deep_size() + self.timestamp.deep_size()
51    }
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
55pub enum RecordDecodeError {
56    #[error("truncated: {0}")]
57    Truncated(&'static str),
58    #[error("invalid value [{0}]: {1}")]
59    InvalidValue(&'static str, &'static str),
60}
61
62#[derive(Debug, PartialEq, thiserror::Error)]
63pub enum RecordPartsError {
64    #[error("unknown command")]
65    UnknownCommand,
66    #[error("invalid `{0}` command: {1}")]
67    CommandPayload(CommandOp, CommandPayloadError),
68    #[error("invalid header: {0}")]
69    Header(#[from] HeaderValidationError),
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct Header {
74    pub name: Bytes,
75    pub value: Bytes,
76}
77
78impl DeepSize for Header {
79    fn deep_size(&self) -> usize {
80        self.name.len() + self.value.len()
81    }
82}
83
84#[derive(Clone, Copy, Debug, PartialEq, Ordinalize)]
85#[repr(u8)]
86pub enum RecordType {
87    Command = 1,
88    Envelope = 2,
89    EncryptedEnvelope = 3,
90}
91
92#[derive(Copy, Clone, Debug, PartialEq)]
93pub struct MagicByte {
94    pub record_type: RecordType,
95    pub metered_size_varlen: u8,
96}
97
98/// Read bytes to u32 in big-endian order.
99fn read_vint_u32_be(bytes: &[u8]) -> u32 {
100    if bytes.len() > size_of::<u32>() || bytes.is_empty() {
101        panic!("invalid variable int bytes = {} len", bytes.len())
102    }
103    let mut acc: u32 = 0;
104    for &byte in bytes {
105        acc = (acc << 8) | byte as u32;
106    }
107    acc
108}
109
110pub fn try_metered_size(record_bytes: &[u8]) -> Result<u32, &'static str> {
111    let magic_byte_u8 = *record_bytes.first().ok_or("byte range is empty")?;
112    let magic_byte = MagicByte::try_from(magic_byte_u8)?;
113    Ok(read_vint_u32_be(
114        record_bytes
115            .get(1..1 + magic_byte.metered_size_varlen as usize)
116            .ok_or("byte range doesn't include bytes for metered size")?,
117    ))
118}
119
120impl MeteredSize for Record {
121    fn metered_size(&self) -> usize {
122        match self {
123            Self::Command(command) => command.metered_size(),
124            Self::Envelope(envelope) => envelope.metered_size(),
125        }
126    }
127}
128
129impl TryFrom<u8> for MagicByte {
130    type Error = &'static str;
131
132    fn try_from(value: u8) -> Result<Self, Self::Error> {
133        let record_type =
134            RecordType::from_ordinal(value & 0b111).ok_or("invalid record type ordinal")?;
135        Ok(Self {
136            record_type,
137            metered_size_varlen: match (value >> 3) & 0b11 {
138                0 => 1u8,
139                1 => 2u8,
140                2 => 3u8,
141                _ => Err("invalid metered_size_varlen")?,
142            },
143        })
144    }
145}
146
147impl From<MagicByte> for u8 {
148    fn from(value: MagicByte) -> Self {
149        ((value.metered_size_varlen - 1) << 3) | value.record_type as u8
150    }
151}
152
153#[derive(Debug, PartialEq, Eq, Clone)]
154pub enum Record {
155    Command(CommandRecord),
156    Envelope(EnvelopeRecord),
157}
158
159impl DeepSize for Record {
160    fn deep_size(&self) -> usize {
161        match self {
162            Self::Command(c) => c.deep_size(),
163            Self::Envelope(e) => e.deep_size(),
164        }
165    }
166}
167
168impl Record {
169    pub fn try_from_parts(headers: Vec<Header>, body: Bytes) -> Result<Self, RecordPartsError> {
170        if headers.len() == 1 {
171            let header = &headers[0];
172            if header.name.is_empty() {
173                let op = CommandOp::from_id(header.value.as_ref())
174                    .ok_or(RecordPartsError::UnknownCommand)?;
175                let command_record = CommandRecord::try_from_parts(op, body.as_ref())
176                    .map_err(|e| RecordPartsError::CommandPayload(op, e))?;
177                return Ok(Self::Command(command_record));
178            }
179        }
180        let envelope = EnvelopeRecord::try_from_parts(headers, body)?;
181        Ok(Self::Envelope(envelope))
182    }
183
184    pub fn sequenced(self, position: StreamPosition) -> SequencedRecord {
185        Sequenced::new(position, self)
186    }
187
188    pub fn into_parts(self) -> (Vec<Header>, Bytes) {
189        match self {
190            Record::Envelope(e) => e.into_parts(),
191            Record::Command(c) => {
192                let op = c.op();
193                let header = Header {
194                    name: Bytes::new(),
195                    value: Bytes::from_static(op.to_id()),
196                };
197                (vec![header], c.payload())
198            }
199        }
200    }
201}
202
203#[derive(Debug, PartialEq, Eq, Clone)]
204pub enum StoredRecord {
205    Plaintext(Record),
206    Encrypted {
207        metered_size: usize,
208        record: EncryptedRecord,
209    },
210}
211
212impl StoredRecord {
213    pub(crate) fn encrypted(record: EncryptedRecord, metered_size: usize) -> Self {
214        Self::Encrypted {
215            metered_size,
216            record,
217        }
218    }
219
220    fn record_type(&self) -> RecordType {
221        match self {
222            Self::Plaintext(Record::Command(_)) => RecordType::Command,
223            Self::Plaintext(Record::Envelope(_)) => RecordType::Envelope,
224            Self::Encrypted { .. } => RecordType::EncryptedEnvelope,
225        }
226    }
227
228    fn encoded_body_size(&self) -> usize {
229        match self {
230            Self::Plaintext(Record::Command(record)) => record.encoded_size(),
231            Self::Plaintext(Record::Envelope(record)) => record.encoded_size(),
232            Self::Encrypted { record, .. } => record.encoded_size(),
233        }
234    }
235
236    fn encode_body_into(&self, buf: &mut impl BufMut) {
237        match self {
238            Self::Plaintext(Record::Command(record)) => record.encode_into(buf),
239            Self::Plaintext(Record::Envelope(record)) => record.encode_into(buf),
240            Self::Encrypted { record, .. } => record.encode_into(buf),
241        }
242    }
243
244    pub fn encryption_mode(&self) -> crate::encryption::EncryptionMode {
245        match self {
246            Self::Plaintext(_) => crate::encryption::EncryptionMode::Plain,
247            Self::Encrypted { record, .. } => record.algorithm().into(),
248        }
249    }
250}
251
252impl DeepSize for StoredRecord {
253    fn deep_size(&self) -> usize {
254        match self {
255            Self::Plaintext(record) => record.deep_size(),
256            Self::Encrypted {
257                metered_size,
258                record,
259            } => metered_size.deep_size() + record.deep_size(),
260        }
261    }
262}
263
264impl MeteredSize for StoredRecord {
265    fn metered_size(&self) -> usize {
266        match self {
267            Self::Plaintext(record) => record.metered_size(),
268            Self::Encrypted { metered_size, .. } => *metered_size,
269        }
270    }
271}
272
273impl From<Record> for StoredRecord {
274    fn from(value: Record) -> Self {
275        Self::Plaintext(value)
276    }
277}
278
279impl From<Record> for Metered<StoredRecord> {
280    fn from(value: Record) -> Self {
281        Self::from(StoredRecord::from(value))
282    }
283}
284
285pub fn decode_if_command_record(record: &[u8]) -> Result<Option<CommandRecord>, RecordDecodeError> {
286    if record.is_empty() {
287        return Err(RecordDecodeError::Truncated("MagicByte"));
288    }
289    let magic_byte = MagicByte::try_from(record[0])
290        .map_err(|msg| RecordDecodeError::InvalidValue("MagicByte", msg))?;
291    match magic_byte.record_type {
292        RecordType::Command => {
293            let offset = 1 + magic_byte.metered_size_varlen as usize;
294            if record.len() < offset {
295                return Err(RecordDecodeError::Truncated("MeteredSize"));
296            }
297            Ok(Some(CommandRecord::try_from(&record[offset..])?))
298        }
299        RecordType::Envelope | RecordType::EncryptedEnvelope => Ok(None),
300    }
301}
302
303pub trait Encodable {
304    fn to_bytes(&self) -> Bytes {
305        let expected_size = self.encoded_size();
306        let mut buf = BytesMut::with_capacity(expected_size);
307        self.encode_into(&mut buf);
308        assert_eq!(buf.len(), expected_size, "no reallocation");
309        buf.freeze()
310    }
311
312    fn encoded_size(&self) -> usize;
313
314    fn encode_into(&self, buf: &mut impl BufMut);
315}
316
317impl Encodable for Metered<&StoredRecord> {
318    fn encoded_size(&self) -> usize {
319        1 + self.magic_byte().metered_size_varlen as usize + self.encoded_body_size()
320    }
321
322    fn encode_into(&self, buf: &mut impl BufMut) {
323        let magic_byte = self.magic_byte();
324        buf.put_u8(magic_byte.into());
325        buf.put_uint(
326            self.metered_size() as u64,
327            magic_byte.metered_size_varlen as usize,
328        );
329        self.encode_body_into(buf);
330    }
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct Sequenced<T> {
335    position: StreamPosition,
336    inner: T,
337}
338
339impl<T> Sequenced<T> {
340    pub const fn new(position: StreamPosition, inner: T) -> Self {
341        Self { position, inner }
342    }
343
344    pub const fn position(&self) -> &StreamPosition {
345        &self.position
346    }
347
348    pub fn inner(&self) -> &T {
349        &self.inner
350    }
351
352    pub fn as_ref(&self) -> Sequenced<&T> {
353        Sequenced::new(self.position, &self.inner)
354    }
355
356    pub fn parts(&self) -> (StreamPosition, &T) {
357        (self.position, &self.inner)
358    }
359
360    pub fn into_parts(self) -> (StreamPosition, T) {
361        (self.position, self.inner)
362    }
363}
364
365pub type StoredSequencedBytes = Sequenced<Bytes>;
366pub type SequencedRecord = Sequenced<Record>;
367pub type StoredSequencedRecord = Sequenced<StoredRecord>;
368
369impl<T> MeteredSize for Sequenced<T>
370where
371    T: MeteredSize,
372{
373    fn metered_size(&self) -> usize {
374        self.inner.metered_size()
375    }
376}
377
378impl<T> DeepSize for Sequenced<T>
379where
380    T: DeepSize,
381{
382    fn deep_size(&self) -> usize {
383        self.position.deep_size() + self.inner.deep_size()
384    }
385}
386
387impl<T> Metered<T>
388where
389    T: MeteredSize,
390{
391    pub fn sequenced(self, position: StreamPosition) -> Metered<Sequenced<T>> {
392        Metered::with_size(
393            self.metered_size(),
394            Sequenced::new(position, self.into_inner()),
395        )
396    }
397}
398
399impl Metered<&StoredRecord> {
400    fn magic_byte(&self) -> MagicByte {
401        let metered_size = self.metered_size();
402        let metered_size_varlen = 8 - (metered_size.leading_zeros() / 8) as u8;
403        if metered_size_varlen > 3 {
404            panic!("illegal metered size varlen {metered_size} for record")
405        }
406        MagicByte {
407            record_type: self.record_type(),
408            metered_size_varlen,
409        }
410    }
411}
412
413impl TryFrom<Bytes> for Metered<StoredRecord> {
414    type Error = RecordDecodeError;
415
416    fn try_from(mut buf: Bytes) -> Result<Self, Self::Error> {
417        if buf.is_empty() {
418            return Err(RecordDecodeError::Truncated("MagicByte"));
419        }
420        let magic_byte = MagicByte::try_from(buf.get_u8())
421            .map_err(|msg| RecordDecodeError::InvalidValue("MagicByte", msg))?;
422
423        let metered_size =
424            buf.try_get_uint(magic_byte.metered_size_varlen as usize)
425                .map_err(|_| RecordDecodeError::Truncated("MeteredSize"))? as usize;
426
427        Ok(Self::with_size(
428            metered_size,
429            match magic_byte.record_type {
430                RecordType::Command => {
431                    StoredRecord::Plaintext(Record::Command(CommandRecord::try_from(buf.as_ref())?))
432                }
433                RecordType::Envelope => {
434                    StoredRecord::Plaintext(Record::Envelope(EnvelopeRecord::try_from(buf)?))
435                }
436                RecordType::EncryptedEnvelope => {
437                    StoredRecord::encrypted(EncryptedRecord::try_from(buf)?, metered_size)
438                }
439            },
440        ))
441    }
442}
443
444impl TryFrom<Bytes> for Metered<Record> {
445    type Error = RecordDecodeError;
446
447    fn try_from(buf: Bytes) -> Result<Self, Self::Error> {
448        let stored: Metered<StoredRecord> = buf.try_into()?;
449        let size = stored.metered_size();
450        match stored.into_inner() {
451            StoredRecord::Plaintext(record) => Ok(record),
452            StoredRecord::Encrypted { .. } => Err(RecordDecodeError::InvalidValue(
453                "RecordType",
454                "encrypted envelope requires decryption",
455            )),
456        }
457        .map(|record| Metered::with_size(size, record))
458    }
459}
460
461impl<T> Metered<Sequenced<T>> {
462    pub fn parts(&self) -> (StreamPosition, Metered<&T>) {
463        let size = self.metered_size();
464        let (position, inner) = self.as_ref().into_inner().parts();
465        (position, Metered::with_size(size, inner))
466    }
467
468    pub fn into_parts(self) -> (StreamPosition, Metered<T>) {
469        let size = self.metered_size();
470        let (position, inner) = self.into_inner().into_parts();
471        (position, Metered::with_size(size, inner))
472    }
473}
474
475#[cfg(test)]
476mod test {
477    use proptest::prelude::*;
478    use rstest::rstest;
479
480    use super::*;
481
482    struct LegacyPlaintextFrame<'a> {
483        record: &'a Record,
484    }
485
486    impl LegacyPlaintextFrame<'_> {
487        fn magic_byte(&self) -> MagicByte {
488            let metered_size = self.record.metered_size();
489            let metered_size_varlen = 8 - (metered_size.leading_zeros() / 8) as u8;
490            assert!(metered_size_varlen <= 3);
491
492            MagicByte {
493                record_type: match self.record {
494                    Record::Command(_) => RecordType::Command,
495                    Record::Envelope(_) => RecordType::Envelope,
496                },
497                metered_size_varlen,
498            }
499        }
500    }
501
502    impl Encodable for LegacyPlaintextFrame<'_> {
503        fn encoded_size(&self) -> usize {
504            let body_size = match self.record {
505                Record::Command(record) => record.encoded_size(),
506                Record::Envelope(record) => record.encoded_size(),
507            };
508            1 + self.magic_byte().metered_size_varlen as usize + body_size
509        }
510
511        fn encode_into(&self, buf: &mut impl BufMut) {
512            let magic_byte = self.magic_byte();
513            buf.put_u8(magic_byte.into());
514            buf.put_uint(
515                self.record.metered_size() as u64,
516                magic_byte.metered_size_varlen as usize,
517            );
518            match self.record {
519                Record::Command(record) => record.encode_into(buf),
520                Record::Envelope(record) => record.encode_into(buf),
521            }
522        }
523    }
524
525    fn legacy_plaintext_bytes(record: &Record) -> Bytes {
526        LegacyPlaintextFrame { record }.to_bytes()
527    }
528
529    fn semantic_metered_size(record: &Record) -> usize {
530        let (headers, body) = record.clone().into_parts();
531        8 + (2 * headers.len())
532            + headers
533                .iter()
534                .map(|header| header.name.len() + header.value.len())
535                .sum::<usize>()
536            + body.len()
537    }
538
539    fn bytes_strategy(allow_empty: bool) -> impl Strategy<Value = Bytes> {
540        prop_oneof![
541            prop::collection::vec(any::<u8>(), (if allow_empty { 0 } else { 1 })..10)
542                .prop_map(Bytes::from),
543            prop::collection::vec(any::<u8>(), 100..1000).prop_map(Bytes::from),
544        ]
545    }
546
547    fn header_strategy() -> impl Strategy<Value = Header> {
548        (bytes_strategy(false), bytes_strategy(true))
549            .prop_map(|(name, value)| Header { name, value })
550    }
551
552    fn headers_strategy() -> impl Strategy<Value = Vec<Header>> {
553        prop_oneof![
554            prop::collection::vec(header_strategy(), 0..10),
555            prop::collection::vec(header_strategy(), 200..300),
556        ]
557    }
558
559    fn command_strategy() -> impl Strategy<Value = CommandRecord> {
560        prop_oneof![
561            proptest::string::string_regex(&format!("[ -~]{{0,{MAX_FENCING_TOKEN_LENGTH}}}"))
562                .unwrap()
563                .prop_map(|token| CommandRecord::Fence(token.parse().unwrap())),
564            any::<SeqNum>().prop_map(CommandRecord::Trim),
565        ]
566    }
567
568    proptest!(
569        #![proptest_config(ProptestConfig::with_cases(10))]
570        #[test]
571        fn roundtrip_envelope(
572            seq_num in any::<SeqNum>(),
573            timestamp in any::<Timestamp>(),
574            headers in headers_strategy(),
575            body in bytes_strategy(true),
576        ) {
577            let record = Record::try_from_parts(headers, body).unwrap();
578            let metered_record: Metered<Record> = record.clone().into();
579            let encoded_record = Metered::from(StoredRecord::from(record.clone()))
580                .as_ref()
581                .to_bytes();
582            let legacy_record = legacy_plaintext_bytes(&record);
583            prop_assert_eq!(encoded_record.as_ref(), legacy_record.as_ref());
584            let decoded_record = Metered::try_from(encoded_record).unwrap();
585            prop_assert_eq!(&decoded_record, &metered_record);
586            let sequenced = decoded_record.sequenced(StreamPosition { seq_num, timestamp });
587            let (position, sequenced_record) = sequenced.into_parts();
588            assert_eq!(position, StreamPosition { seq_num, timestamp });
589            assert_eq!(sequenced_record.into_inner(), record);
590        }
591    );
592
593    proptest!(
594        #![proptest_config(ProptestConfig::with_cases(10))]
595        #[test]
596        fn roundtrip_metered(
597            headers in headers_strategy(),
598            body in bytes_strategy(true),
599        ) {
600            let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
601            let encoded_record = Metered::from(StoredRecord::from(record.clone()))
602                .as_ref()
603                .to_bytes();
604            assert_eq!(record.metered_size(), semantic_metered_size(&record));
605            assert_eq!(record.metered_size(), try_metered_size(encoded_record.as_ref()).unwrap() as usize);
606        }
607    );
608
609    proptest!(
610        #![proptest_config(ProptestConfig::with_cases(10))]
611        #[test]
612        fn roundtrip_command_metered(command in command_strategy()) {
613            let record = Record::Command(command);
614            let encoded_record = Metered::from(StoredRecord::from(record.clone()))
615                .as_ref()
616                .to_bytes();
617            let expected_metered = semantic_metered_size(&record);
618            let wire_metered = try_metered_size(encoded_record.as_ref()).unwrap() as usize;
619            let decoded_record: Metered<Record> = Metered::try_from(encoded_record).unwrap();
620
621            assert_eq!(record.metered_size(), expected_metered);
622            assert_eq!(record.metered_size(), wire_metered);
623            prop_assert_eq!(decoded_record, Metered::<Record>::from(record));
624        }
625    );
626
627    #[test]
628    fn roundtrip_encrypted_stored_record() {
629        let mut encoded = BytesMut::with_capacity(1 + 12 + 10 + 16);
630        encoded.put_u8(0x02);
631        encoded.put_slice(b"0123456789ab");
632        encoded.put_slice(b"ciphertext");
633        encoded.put_slice(b"0123456789abcdef");
634        let record =
635            StoredRecord::encrypted(EncryptedRecord::try_from(encoded.freeze()).unwrap(), 123);
636        let metered_record: Metered<StoredRecord> = record.clone().into();
637        let encoded_record = metered_record.as_ref().to_bytes();
638        let decoded_record = Metered::try_from(encoded_record).unwrap();
639        assert_eq!(decoded_record, metered_record);
640    }
641
642    #[test]
643    fn empty_header_name_solo() {
644        let headers = vec![Header {
645            name: Bytes::new(),
646            value: Bytes::from("hi"),
647        }];
648        let body = Bytes::from("hello");
649        assert_eq!(
650            Record::try_from_parts(headers, body),
651            Err(RecordPartsError::UnknownCommand)
652        );
653    }
654
655    #[test]
656    fn empty_header_name_among_others() {
657        let headers = vec![
658            Header {
659                name: Bytes::from("boku"),
660                value: Bytes::from("hi"),
661            },
662            Header {
663                name: Bytes::new(),
664                value: Bytes::from("hi"),
665            },
666        ];
667        let body = Bytes::from("hello");
668        assert_eq!(
669            Record::try_from_parts(headers, body),
670            Err(RecordPartsError::Header(HeaderValidationError::NameEmpty))
671        );
672    }
673
674    fn command_parts(op: &'static [u8], payload: &'static [u8]) -> (Vec<Header>, Bytes) {
675        let headers = vec![Header {
676            name: Bytes::new(),
677            value: Bytes::from_static(op),
678        }];
679        let body = Bytes::from_static(payload);
680        (headers, body)
681    }
682
683    fn assert_valid_command_record(op: &'static [u8], payload: &'static [u8]) {
684        let (headers, body) = command_parts(op, payload);
685        let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
686        let record_metered = record.metered_size();
687        match &record {
688            Record::Command(cmd) => {
689                assert_eq!(cmd.op().to_id(), op);
690                assert_eq!(cmd.payload().as_ref(), payload);
691            }
692            other => panic!("Command expected, got {other:?}"),
693        }
694        let encoded_record = Metered::from(StoredRecord::from(record.clone()))
695            .as_ref()
696            .to_bytes();
697        assert_eq!(record_metered, semantic_metered_size(&record));
698        assert_eq!(
699            record_metered,
700            try_metered_size(encoded_record.as_ref()).unwrap() as usize
701        );
702        assert_eq!(
703            encoded_record.as_ref(),
704            legacy_plaintext_bytes(&record).as_ref()
705        );
706        let sequenced_record = record.clone().sequenced(StreamPosition {
707            seq_num: 42,
708            timestamp: 100_000,
709        });
710        let sequenced_metered = sequenced_record.metered_size();
711        assert_eq!(record_metered, sequenced_metered);
712        assert_eq!(
713            sequenced_record.position,
714            StreamPosition {
715                seq_num: 42,
716                timestamp: 100_000,
717            }
718        );
719        assert_eq!(
720            sequenced_record.inner,
721            Record::try_from_parts(headers, body).unwrap()
722        );
723    }
724
725    #[rstest]
726    #[case::fence_empty(b"fence", b"")]
727    #[case::fence_uuid(b"fence", b"my-special-uuid")]
728    #[case::trim_0(b"trim", b"\x00\x00\x00\x00\x00\x00\x00\x00")]
729    fn valid_command_records(#[case] op: &'static [u8], #[case] payload: &'static [u8]) {
730        assert_valid_command_record(op, payload);
731    }
732
733    #[rstest]
734    #[case::fence_too_long(
735        b"fence",
736        b"toolongtoolongtoolongtoolongtoolongtoolongtoolong",
737        RecordPartsError::CommandPayload(
738            CommandOp::Fence,
739            CommandPayloadError::FencingTokenTooLong(FencingTokenTooLongError(49)),
740        )
741    )]
742    #[case::trim_empty(
743        b"trim",
744        b"",
745        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(0),)
746    )]
747    #[case::trim_overflow(
748        b"trim",
749        b"\x00\x00\x00\x00\x00\x00\x00\x00\x00",
750        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(9),)
751    )]
752    fn invalid_command_records(
753        #[case] op: &'static [u8],
754        #[case] payload: &'static [u8],
755        #[case] expected: RecordPartsError,
756    ) {
757        let (headers, body) = command_parts(op, payload);
758        assert_eq!(Record::try_from_parts(headers, body), Err(expected));
759    }
760
761    #[rstest]
762    #[case(0b0000_0010, MagicByte { record_type: RecordType::Envelope, metered_size_varlen: 1})]
763    #[case(0b0001_0010, MagicByte { record_type: RecordType::Envelope, metered_size_varlen: 3})]
764    #[case(0b0000_0011, MagicByte { record_type: RecordType::EncryptedEnvelope, metered_size_varlen: 1})]
765    #[case(0b0000_1001, MagicByte { record_type: RecordType::Command, metered_size_varlen: 2})]
766    fn valid_magic_byte_parsing(#[case] as_u8: u8, #[case] magic_byte: MagicByte) {
767        assert_eq!(MagicByte::try_from(as_u8).unwrap(), magic_byte);
768        assert_eq!(u8::from(magic_byte), as_u8);
769    }
770
771    #[rstest]
772    #[case(0b0000_1101, "invalid record type ordinal")]
773    #[case(0b0001_1001, "invalid metered_size_varlen")]
774    fn invalid_magic_byte_parsing(#[case] as_u8: u8, #[case] expected: &'static str) {
775        assert_eq!(MagicByte::try_from(as_u8), Err(expected));
776    }
777
778    #[test]
779    fn metered_record_truncated_after_magic_byte_returns_error() {
780        // Magic byte: Envelope (0b0000_0010), metered_size_varlen = 1 → expects 1 more byte.
781        let truncated = Bytes::from_static(&[0b0000_0010]);
782        let result: Result<Metered<Record>, _> = truncated.try_into();
783        assert_eq!(result, Err(RecordDecodeError::Truncated("MeteredSize")));
784    }
785
786    #[test]
787    fn test_read_varint() {
788        let data = [0u8, 0, 0, 1, 0, 0, 0];
789
790        assert_eq!(read_vint_u32_be(&data[..4]), 1u32);
791        assert_eq!(read_vint_u32_be(&data[2..5]), 2u32.pow(8));
792        assert_eq!(read_vint_u32_be(&data[2..6]), 2u32.pow(16));
793        assert_eq!(read_vint_u32_be(&data[3..]), 2u32.pow(24));
794    }
795}