Skip to main content

s2_common/record/
mod.rs

1mod batcher;
2mod command;
3mod encryption;
4mod envelope;
5mod fencing;
6mod iterator;
7mod metering;
8
9pub use batcher::{RecordBatch, RecordBatcher};
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11pub use command::CommandRecord;
12use command::{CommandOp, CommandPayloadError};
13pub use encryption::{
14    EncryptedRecord, RecordDecryptionError, decrypt_stored_record, encrypt_record,
15};
16pub use envelope::EnvelopeRecord;
17use envelope::HeaderValidationError;
18pub use fencing::{FencingToken, FencingTokenTooLongError, MAX_FENCING_TOKEN_LENGTH};
19pub use iterator::StoredRecordIterator;
20pub use metering::{Metered, MeteredExt, MeteredSize};
21use strum::FromRepr;
22
23use crate::deep_size::DeepSize;
24
25pub type SeqNum = u64;
26pub type NonZeroSeqNum = std::num::NonZeroU64;
27pub type Timestamp = u64;
28
29#[derive(Debug, PartialEq, Eq, Clone, Copy)]
30pub struct StreamPosition {
31    pub seq_num: SeqNum,
32    pub timestamp: Timestamp,
33}
34
35impl StreamPosition {
36    pub const MIN: StreamPosition = StreamPosition {
37        seq_num: SeqNum::MIN,
38        timestamp: Timestamp::MIN,
39    };
40}
41
42impl std::fmt::Display for StreamPosition {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "{} @ {}", self.seq_num, self.timestamp)
45    }
46}
47
48impl DeepSize for StreamPosition {
49    fn deep_size(&self) -> usize {
50        self.seq_num.deep_size() + self.timestamp.deep_size()
51    }
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
55pub enum RecordDecodeError {
56    #[error("truncated: {0}")]
57    Truncated(&'static str),
58    #[error("invalid value [{0}]: {1}")]
59    InvalidValue(&'static str, &'static str),
60}
61
62#[derive(Debug, PartialEq, thiserror::Error)]
63pub enum RecordPartsError {
64    #[error("unknown command")]
65    UnknownCommand,
66    #[error("invalid `{0}` command: {1}")]
67    CommandPayload(CommandOp, CommandPayloadError),
68    #[error("invalid header: {0}")]
69    Header(#[from] HeaderValidationError),
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct Header {
74    pub name: Bytes,
75    pub value: Bytes,
76}
77
78impl DeepSize for Header {
79    fn deep_size(&self) -> usize {
80        self.name.len() + self.value.len()
81    }
82}
83
84#[derive(Clone, Copy, Debug, PartialEq, FromRepr)]
85#[repr(u8)]
86pub enum RecordType {
87    Command = 1,
88    Envelope = 2,
89    EncryptedEnvelope = 3,
90}
91
92#[derive(Copy, Clone, Debug, PartialEq)]
93pub struct MagicByte {
94    pub record_type: RecordType,
95    pub metered_size_varlen: u8,
96}
97
98/// Read bytes to u32 in big-endian order.
99fn read_vint_u32_be(bytes: &[u8]) -> u32 {
100    if bytes.len() > size_of::<u32>() || bytes.is_empty() {
101        panic!("invalid variable int bytes = {} len", bytes.len())
102    }
103    let mut acc: u32 = 0;
104    for &byte in bytes {
105        acc = (acc << 8) | byte as u32;
106    }
107    acc
108}
109
110pub fn try_metered_size(record_bytes: &[u8]) -> Result<u32, &'static str> {
111    let magic_byte_u8 = *record_bytes.first().ok_or("byte range is empty")?;
112    let magic_byte = MagicByte::try_from(magic_byte_u8)?;
113    Ok(read_vint_u32_be(
114        record_bytes
115            .get(1..1 + magic_byte.metered_size_varlen as usize)
116            .ok_or("byte range doesn't include bytes for metered size")?,
117    ))
118}
119
120impl MeteredSize for Record {
121    fn metered_size(&self) -> usize {
122        match self {
123            Self::Command(command) => command.metered_size(),
124            Self::Envelope(envelope) => envelope.metered_size(),
125        }
126    }
127}
128
129impl TryFrom<u8> for MagicByte {
130    type Error = &'static str;
131
132    fn try_from(value: u8) -> Result<Self, Self::Error> {
133        let record_type =
134            RecordType::from_repr(value & 0b111).ok_or("invalid record type ordinal")?;
135        Ok(Self {
136            record_type,
137            metered_size_varlen: match (value >> 3) & 0b11 {
138                0 => 1u8,
139                1 => 2u8,
140                2 => 3u8,
141                _ => Err("invalid metered_size_varlen")?,
142            },
143        })
144    }
145}
146
147impl From<MagicByte> for u8 {
148    fn from(value: MagicByte) -> Self {
149        ((value.metered_size_varlen - 1) << 3) | value.record_type as u8
150    }
151}
152
153#[derive(Debug, PartialEq, Eq, Clone)]
154pub enum Record {
155    Command(CommandRecord),
156    Envelope(EnvelopeRecord),
157}
158
159impl DeepSize for Record {
160    fn deep_size(&self) -> usize {
161        match self {
162            Self::Command(c) => c.deep_size(),
163            Self::Envelope(e) => e.deep_size(),
164        }
165    }
166}
167
168impl Record {
169    pub fn try_from_parts(headers: Vec<Header>, body: Bytes) -> Result<Self, RecordPartsError> {
170        if headers.len() == 1 {
171            let header = &headers[0];
172            if header.name.is_empty() {
173                let op = CommandOp::from_id(header.value.as_ref())
174                    .ok_or(RecordPartsError::UnknownCommand)?;
175                let command_record = CommandRecord::try_from_parts(op, body.as_ref())
176                    .map_err(|e| RecordPartsError::CommandPayload(op, e))?;
177                return Ok(Self::Command(command_record));
178            }
179        }
180        let envelope = EnvelopeRecord::try_from_parts(headers, body)?;
181        Ok(Self::Envelope(envelope))
182    }
183
184    pub fn sequenced(self, position: StreamPosition) -> SequencedRecord {
185        Sequenced::new(position, self)
186    }
187
188    pub fn into_parts(self) -> (Vec<Header>, Bytes) {
189        match self {
190            Record::Envelope(e) => e.into_parts(),
191            Record::Command(c) => {
192                let op = c.op();
193                let header = Header {
194                    name: Bytes::new(),
195                    value: Bytes::from_static(op.to_id()),
196                };
197                (vec![header], c.payload())
198            }
199        }
200    }
201}
202
203#[derive(Debug, PartialEq, Eq, Clone)]
204pub enum StoredRecord {
205    Plaintext(Record),
206    Encrypted {
207        metered_size: usize,
208        record: EncryptedRecord,
209    },
210}
211
212impl StoredRecord {
213    pub(crate) fn encrypted(record: EncryptedRecord, metered_size: usize) -> Self {
214        Self::Encrypted {
215            metered_size,
216            record,
217        }
218    }
219
220    fn record_type(&self) -> RecordType {
221        match self {
222            Self::Plaintext(Record::Command(_)) => RecordType::Command,
223            Self::Plaintext(Record::Envelope(_)) => RecordType::Envelope,
224            Self::Encrypted { .. } => RecordType::EncryptedEnvelope,
225        }
226    }
227
228    fn encoded_body_size(&self) -> usize {
229        match self {
230            Self::Plaintext(Record::Command(record)) => record.encoded_size(),
231            Self::Plaintext(Record::Envelope(record)) => record.encoded_size(),
232            Self::Encrypted { record, .. } => record.encoded_size(),
233        }
234    }
235
236    fn encode_body_into(&self, buf: &mut impl BufMut) {
237        match self {
238            Self::Plaintext(Record::Command(record)) => record.encode_into(buf),
239            Self::Plaintext(Record::Envelope(record)) => record.encode_into(buf),
240            Self::Encrypted { record, .. } => record.encode_into(buf),
241        }
242    }
243
244    pub fn encryption_algorithm(&self) -> Option<crate::encryption::EncryptionAlgorithm> {
245        match self {
246            Self::Plaintext(_) => None,
247            Self::Encrypted { record, .. } => Some(record.algorithm()),
248        }
249    }
250
251    pub fn max_assignable_seq_num(&self) -> SeqNum {
252        match self {
253            Self::Plaintext(_) => SeqNum::MAX,
254            Self::Encrypted { record, .. } => record.max_assignable_seq_num(),
255        }
256    }
257}
258
259impl DeepSize for StoredRecord {
260    fn deep_size(&self) -> usize {
261        match self {
262            Self::Plaintext(record) => record.deep_size(),
263            Self::Encrypted {
264                metered_size,
265                record,
266            } => metered_size.deep_size() + record.deep_size(),
267        }
268    }
269}
270
271impl MeteredSize for StoredRecord {
272    fn metered_size(&self) -> usize {
273        match self {
274            Self::Plaintext(record) => record.metered_size(),
275            Self::Encrypted { metered_size, .. } => *metered_size,
276        }
277    }
278}
279
280impl From<Record> for StoredRecord {
281    fn from(value: Record) -> Self {
282        Self::Plaintext(value)
283    }
284}
285
286impl From<Record> for Metered<StoredRecord> {
287    fn from(value: Record) -> Self {
288        Self::from(StoredRecord::from(value))
289    }
290}
291
292pub fn decode_if_command_record(record: &[u8]) -> Result<Option<CommandRecord>, RecordDecodeError> {
293    if record.is_empty() {
294        return Err(RecordDecodeError::Truncated("MagicByte"));
295    }
296    let magic_byte = MagicByte::try_from(record[0])
297        .map_err(|msg| RecordDecodeError::InvalidValue("MagicByte", msg))?;
298    match magic_byte.record_type {
299        RecordType::Command => {
300            let offset = 1 + magic_byte.metered_size_varlen as usize;
301            if record.len() < offset {
302                return Err(RecordDecodeError::Truncated("MeteredSize"));
303            }
304            Ok(Some(CommandRecord::try_from(&record[offset..])?))
305        }
306        RecordType::Envelope | RecordType::EncryptedEnvelope => Ok(None),
307    }
308}
309
310pub trait Encodable {
311    fn to_bytes(&self) -> Bytes {
312        let expected_size = self.encoded_size();
313        let mut buf = BytesMut::with_capacity(expected_size);
314        self.encode_into(&mut buf);
315        assert_eq!(buf.len(), expected_size, "no reallocation");
316        buf.freeze()
317    }
318
319    fn encoded_size(&self) -> usize;
320
321    fn encode_into(&self, buf: &mut impl BufMut);
322}
323
324impl Encodable for Metered<&StoredRecord> {
325    fn encoded_size(&self) -> usize {
326        1 + self.magic_byte().metered_size_varlen as usize + self.encoded_body_size()
327    }
328
329    fn encode_into(&self, buf: &mut impl BufMut) {
330        let magic_byte = self.magic_byte();
331        buf.put_u8(magic_byte.into());
332        buf.put_uint(
333            self.metered_size() as u64,
334            magic_byte.metered_size_varlen as usize,
335        );
336        self.encode_body_into(buf);
337    }
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct Sequenced<T> {
342    position: StreamPosition,
343    inner: T,
344}
345
346impl<T> Sequenced<T> {
347    pub const fn new(position: StreamPosition, inner: T) -> Self {
348        Self { position, inner }
349    }
350
351    pub const fn position(&self) -> &StreamPosition {
352        &self.position
353    }
354
355    pub fn inner(&self) -> &T {
356        &self.inner
357    }
358
359    pub fn as_ref(&self) -> Sequenced<&T> {
360        Sequenced::new(self.position, &self.inner)
361    }
362
363    pub fn parts(&self) -> (StreamPosition, &T) {
364        (self.position, &self.inner)
365    }
366
367    pub fn into_parts(self) -> (StreamPosition, T) {
368        (self.position, self.inner)
369    }
370}
371
372pub type StoredSequencedBytes = Sequenced<Bytes>;
373pub type SequencedRecord = Sequenced<Record>;
374pub type StoredSequencedRecord = Sequenced<StoredRecord>;
375
376impl<T> MeteredSize for Sequenced<T>
377where
378    T: MeteredSize,
379{
380    fn metered_size(&self) -> usize {
381        self.inner.metered_size()
382    }
383}
384
385impl<T> DeepSize for Sequenced<T>
386where
387    T: DeepSize,
388{
389    fn deep_size(&self) -> usize {
390        self.position.deep_size() + self.inner.deep_size()
391    }
392}
393
394impl<T> Metered<T>
395where
396    T: MeteredSize,
397{
398    pub fn sequenced(self, position: StreamPosition) -> Metered<Sequenced<T>> {
399        Metered::with_size(
400            self.metered_size(),
401            Sequenced::new(position, self.into_inner()),
402        )
403    }
404}
405
406impl Metered<&StoredRecord> {
407    fn magic_byte(&self) -> MagicByte {
408        let metered_size = self.metered_size();
409        let metered_size_varlen = 8 - (metered_size.leading_zeros() / 8) as u8;
410        if metered_size_varlen > 3 {
411            panic!("illegal metered size varlen {metered_size} for record")
412        }
413        MagicByte {
414            record_type: self.record_type(),
415            metered_size_varlen,
416        }
417    }
418}
419
420impl TryFrom<Bytes> for Metered<StoredRecord> {
421    type Error = RecordDecodeError;
422
423    fn try_from(mut buf: Bytes) -> Result<Self, Self::Error> {
424        if buf.is_empty() {
425            return Err(RecordDecodeError::Truncated("MagicByte"));
426        }
427        let magic_byte = MagicByte::try_from(buf.get_u8())
428            .map_err(|msg| RecordDecodeError::InvalidValue("MagicByte", msg))?;
429
430        let metered_size =
431            buf.try_get_uint(magic_byte.metered_size_varlen as usize)
432                .map_err(|_| RecordDecodeError::Truncated("MeteredSize"))? as usize;
433
434        Ok(Self::with_size(
435            metered_size,
436            match magic_byte.record_type {
437                RecordType::Command => {
438                    StoredRecord::Plaintext(Record::Command(CommandRecord::try_from(buf.as_ref())?))
439                }
440                RecordType::Envelope => {
441                    StoredRecord::Plaintext(Record::Envelope(EnvelopeRecord::try_from(buf)?))
442                }
443                RecordType::EncryptedEnvelope => {
444                    StoredRecord::encrypted(EncryptedRecord::try_from(buf)?, metered_size)
445                }
446            },
447        ))
448    }
449}
450
451impl TryFrom<Bytes> for Metered<Record> {
452    type Error = RecordDecodeError;
453
454    fn try_from(buf: Bytes) -> Result<Self, Self::Error> {
455        let stored: Metered<StoredRecord> = buf.try_into()?;
456        let size = stored.metered_size();
457        match stored.into_inner() {
458            StoredRecord::Plaintext(record) => Ok(record),
459            StoredRecord::Encrypted { .. } => Err(RecordDecodeError::InvalidValue(
460                "RecordType",
461                "encrypted envelope requires decryption",
462            )),
463        }
464        .map(|record| Metered::with_size(size, record))
465    }
466}
467
468impl<T> Metered<Sequenced<T>> {
469    pub fn parts(&self) -> (StreamPosition, Metered<&T>) {
470        let size = self.metered_size();
471        let (position, inner) = self.as_ref().into_inner().parts();
472        (position, Metered::with_size(size, inner))
473    }
474
475    pub fn into_parts(self) -> (StreamPosition, Metered<T>) {
476        let size = self.metered_size();
477        let (position, inner) = self.into_inner().into_parts();
478        (position, Metered::with_size(size, inner))
479    }
480}
481
482#[cfg(test)]
483mod test {
484    use proptest::prelude::*;
485    use rstest::rstest;
486
487    use super::*;
488
489    struct LegacyPlaintextFrame<'a> {
490        record: &'a Record,
491    }
492
493    impl LegacyPlaintextFrame<'_> {
494        fn magic_byte(&self) -> MagicByte {
495            let metered_size = self.record.metered_size();
496            let metered_size_varlen = 8 - (metered_size.leading_zeros() / 8) as u8;
497            assert!(metered_size_varlen <= 3);
498
499            MagicByte {
500                record_type: match self.record {
501                    Record::Command(_) => RecordType::Command,
502                    Record::Envelope(_) => RecordType::Envelope,
503                },
504                metered_size_varlen,
505            }
506        }
507    }
508
509    impl Encodable for LegacyPlaintextFrame<'_> {
510        fn encoded_size(&self) -> usize {
511            let body_size = match self.record {
512                Record::Command(record) => record.encoded_size(),
513                Record::Envelope(record) => record.encoded_size(),
514            };
515            1 + self.magic_byte().metered_size_varlen as usize + body_size
516        }
517
518        fn encode_into(&self, buf: &mut impl BufMut) {
519            let magic_byte = self.magic_byte();
520            buf.put_u8(magic_byte.into());
521            buf.put_uint(
522                self.record.metered_size() as u64,
523                magic_byte.metered_size_varlen as usize,
524            );
525            match self.record {
526                Record::Command(record) => record.encode_into(buf),
527                Record::Envelope(record) => record.encode_into(buf),
528            }
529        }
530    }
531
532    fn legacy_plaintext_bytes(record: &Record) -> Bytes {
533        LegacyPlaintextFrame { record }.to_bytes()
534    }
535
536    fn semantic_metered_size(record: &Record) -> usize {
537        let (headers, body) = record.clone().into_parts();
538        8 + (2 * headers.len())
539            + headers
540                .iter()
541                .map(|header| header.name.len() + header.value.len())
542                .sum::<usize>()
543            + body.len()
544    }
545
546    fn bytes_strategy(allow_empty: bool) -> impl Strategy<Value = Bytes> {
547        prop_oneof![
548            prop::collection::vec(any::<u8>(), (if allow_empty { 0 } else { 1 })..10)
549                .prop_map(Bytes::from),
550            prop::collection::vec(any::<u8>(), 100..1000).prop_map(Bytes::from),
551        ]
552    }
553
554    fn header_strategy() -> impl Strategy<Value = Header> {
555        (bytes_strategy(false), bytes_strategy(true))
556            .prop_map(|(name, value)| Header { name, value })
557    }
558
559    fn headers_strategy() -> impl Strategy<Value = Vec<Header>> {
560        prop_oneof![
561            prop::collection::vec(header_strategy(), 0..10),
562            prop::collection::vec(header_strategy(), 200..300),
563        ]
564    }
565
566    fn command_strategy() -> impl Strategy<Value = CommandRecord> {
567        prop_oneof![
568            proptest::string::string_regex(&format!("[ -~]{{0,{MAX_FENCING_TOKEN_LENGTH}}}"))
569                .unwrap()
570                .prop_map(|token| CommandRecord::Fence(token.parse().unwrap())),
571            any::<SeqNum>().prop_map(CommandRecord::Trim),
572        ]
573    }
574
575    proptest!(
576        #![proptest_config(ProptestConfig::with_cases(10))]
577        #[test]
578        fn roundtrip_envelope(
579            seq_num in any::<SeqNum>(),
580            timestamp in any::<Timestamp>(),
581            headers in headers_strategy(),
582            body in bytes_strategy(true),
583        ) {
584            let record = Record::try_from_parts(headers, body).unwrap();
585            let metered_record: Metered<Record> = record.clone().into();
586            let encoded_record = Metered::from(StoredRecord::from(record.clone()))
587                .as_ref()
588                .to_bytes();
589            let legacy_record = legacy_plaintext_bytes(&record);
590            prop_assert_eq!(encoded_record.as_ref(), legacy_record.as_ref());
591            let decoded_record = Metered::try_from(encoded_record).unwrap();
592            prop_assert_eq!(&decoded_record, &metered_record);
593            let sequenced = decoded_record.sequenced(StreamPosition { seq_num, timestamp });
594            let (position, sequenced_record) = sequenced.into_parts();
595            assert_eq!(position, StreamPosition { seq_num, timestamp });
596            assert_eq!(sequenced_record.into_inner(), record);
597        }
598    );
599
600    proptest!(
601        #![proptest_config(ProptestConfig::with_cases(10))]
602        #[test]
603        fn roundtrip_metered(
604            headers in headers_strategy(),
605            body in bytes_strategy(true),
606        ) {
607            let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
608            let encoded_record = Metered::from(StoredRecord::from(record.clone()))
609                .as_ref()
610                .to_bytes();
611            assert_eq!(record.metered_size(), semantic_metered_size(&record));
612            assert_eq!(record.metered_size(), try_metered_size(encoded_record.as_ref()).unwrap() as usize);
613        }
614    );
615
616    proptest!(
617        #![proptest_config(ProptestConfig::with_cases(10))]
618        #[test]
619        fn roundtrip_command_metered(command in command_strategy()) {
620            let record = Record::Command(command);
621            let encoded_record = Metered::from(StoredRecord::from(record.clone()))
622                .as_ref()
623                .to_bytes();
624            let expected_metered = semantic_metered_size(&record);
625            let wire_metered = try_metered_size(encoded_record.as_ref()).unwrap() as usize;
626            let decoded_record: Metered<Record> = Metered::try_from(encoded_record).unwrap();
627
628            assert_eq!(record.metered_size(), expected_metered);
629            assert_eq!(record.metered_size(), wire_metered);
630            prop_assert_eq!(decoded_record, Metered::<Record>::from(record));
631        }
632    );
633
634    #[test]
635    fn roundtrip_encrypted_stored_record() {
636        let mut encoded = BytesMut::with_capacity(1 + 12 + 10 + 16);
637        encoded.put_u8(0x02);
638        encoded.put_slice(b"0123456789ab");
639        encoded.put_slice(b"ciphertext");
640        encoded.put_slice(b"0123456789abcdef");
641        let record =
642            StoredRecord::encrypted(EncryptedRecord::try_from(encoded.freeze()).unwrap(), 123);
643        let metered_record: Metered<StoredRecord> = record.clone().into();
644        let encoded_record = metered_record.as_ref().to_bytes();
645        let decoded_record = Metered::try_from(encoded_record).unwrap();
646        assert_eq!(decoded_record, metered_record);
647    }
648
649    #[test]
650    fn empty_header_name_solo() {
651        let headers = vec![Header {
652            name: Bytes::new(),
653            value: Bytes::from("hi"),
654        }];
655        let body = Bytes::from("hello");
656        assert_eq!(
657            Record::try_from_parts(headers, body),
658            Err(RecordPartsError::UnknownCommand)
659        );
660    }
661
662    #[test]
663    fn empty_header_name_among_others() {
664        let headers = vec![
665            Header {
666                name: Bytes::from("boku"),
667                value: Bytes::from("hi"),
668            },
669            Header {
670                name: Bytes::new(),
671                value: Bytes::from("hi"),
672            },
673        ];
674        let body = Bytes::from("hello");
675        assert_eq!(
676            Record::try_from_parts(headers, body),
677            Err(RecordPartsError::Header(HeaderValidationError::NameEmpty))
678        );
679    }
680
681    fn command_parts(op: &'static [u8], payload: &'static [u8]) -> (Vec<Header>, Bytes) {
682        let headers = vec![Header {
683            name: Bytes::new(),
684            value: Bytes::from_static(op),
685        }];
686        let body = Bytes::from_static(payload);
687        (headers, body)
688    }
689
690    fn assert_valid_command_record(op: &'static [u8], payload: &'static [u8]) {
691        let (headers, body) = command_parts(op, payload);
692        let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
693        let record_metered = record.metered_size();
694        match &record {
695            Record::Command(cmd) => {
696                assert_eq!(cmd.op().to_id(), op);
697                assert_eq!(cmd.payload().as_ref(), payload);
698            }
699            other => panic!("Command expected, got {other:?}"),
700        }
701        let encoded_record = Metered::from(StoredRecord::from(record.clone()))
702            .as_ref()
703            .to_bytes();
704        assert_eq!(record_metered, semantic_metered_size(&record));
705        assert_eq!(
706            record_metered,
707            try_metered_size(encoded_record.as_ref()).unwrap() as usize
708        );
709        assert_eq!(
710            encoded_record.as_ref(),
711            legacy_plaintext_bytes(&record).as_ref()
712        );
713        let sequenced_record = record.clone().sequenced(StreamPosition {
714            seq_num: 42,
715            timestamp: 100_000,
716        });
717        let sequenced_metered = sequenced_record.metered_size();
718        assert_eq!(record_metered, sequenced_metered);
719        assert_eq!(
720            sequenced_record.position,
721            StreamPosition {
722                seq_num: 42,
723                timestamp: 100_000,
724            }
725        );
726        assert_eq!(
727            sequenced_record.inner,
728            Record::try_from_parts(headers, body).unwrap()
729        );
730    }
731
732    #[rstest]
733    #[case::fence_empty(b"fence", b"")]
734    #[case::fence_uuid(b"fence", b"my-special-uuid")]
735    #[case::trim_0(b"trim", b"\x00\x00\x00\x00\x00\x00\x00\x00")]
736    fn valid_command_records(#[case] op: &'static [u8], #[case] payload: &'static [u8]) {
737        assert_valid_command_record(op, payload);
738    }
739
740    #[rstest]
741    #[case::fence_too_long(
742        b"fence",
743        b"toolongtoolongtoolongtoolongtoolongtoolongtoolong",
744        RecordPartsError::CommandPayload(
745            CommandOp::Fence,
746            CommandPayloadError::FencingTokenTooLong(FencingTokenTooLongError(49)),
747        )
748    )]
749    #[case::trim_empty(
750        b"trim",
751        b"",
752        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(0),)
753    )]
754    #[case::trim_overflow(
755        b"trim",
756        b"\x00\x00\x00\x00\x00\x00\x00\x00\x00",
757        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(9),)
758    )]
759    fn invalid_command_records(
760        #[case] op: &'static [u8],
761        #[case] payload: &'static [u8],
762        #[case] expected: RecordPartsError,
763    ) {
764        let (headers, body) = command_parts(op, payload);
765        assert_eq!(Record::try_from_parts(headers, body), Err(expected));
766    }
767
768    #[rstest]
769    #[case(0b0000_0010, MagicByte { record_type: RecordType::Envelope, metered_size_varlen: 1})]
770    #[case(0b0001_0010, MagicByte { record_type: RecordType::Envelope, metered_size_varlen: 3})]
771    #[case(0b0000_0011, MagicByte { record_type: RecordType::EncryptedEnvelope, metered_size_varlen: 1})]
772    #[case(0b0000_1001, MagicByte { record_type: RecordType::Command, metered_size_varlen: 2})]
773    fn valid_magic_byte_parsing(#[case] as_u8: u8, #[case] magic_byte: MagicByte) {
774        assert_eq!(MagicByte::try_from(as_u8).unwrap(), magic_byte);
775        assert_eq!(u8::from(magic_byte), as_u8);
776    }
777
778    #[rstest]
779    #[case(0b0000_1101, "invalid record type ordinal")]
780    #[case(0b0001_1001, "invalid metered_size_varlen")]
781    fn invalid_magic_byte_parsing(#[case] as_u8: u8, #[case] expected: &'static str) {
782        assert_eq!(MagicByte::try_from(as_u8), Err(expected));
783    }
784
785    #[test]
786    fn metered_record_truncated_after_magic_byte_returns_error() {
787        // Magic byte: Envelope (0b0000_0010), metered_size_varlen = 1 → expects 1 more byte.
788        let truncated = Bytes::from_static(&[0b0000_0010]);
789        let result: Result<Metered<Record>, _> = truncated.try_into();
790        assert_eq!(result, Err(RecordDecodeError::Truncated("MeteredSize")));
791    }
792
793    #[test]
794    fn test_read_varint() {
795        let data = [0u8, 0, 0, 1, 0, 0, 0];
796
797        assert_eq!(read_vint_u32_be(&data[..4]), 1u32);
798        assert_eq!(read_vint_u32_be(&data[2..5]), 2u32.pow(8));
799        assert_eq!(read_vint_u32_be(&data[2..6]), 2u32.pow(16));
800        assert_eq!(read_vint_u32_be(&data[3..]), 2u32.pow(24));
801    }
802}