1mod batcher;
2mod command;
3mod encryption;
4mod envelope;
5mod fencing;
6mod iterator;
7mod metering;
8
9pub use batcher::{RecordBatch, RecordBatcher};
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11pub use command::CommandRecord;
12use command::{CommandOp, CommandPayloadError};
13pub use encryption::{
14 EncryptedRecord, RecordDecryptionError, decrypt_stored_record, encrypt_record,
15};
16pub use envelope::EnvelopeRecord;
17use envelope::HeaderValidationError;
18pub use fencing::{FencingToken, FencingTokenTooLongError, MAX_FENCING_TOKEN_LENGTH};
19pub use iterator::StoredRecordIterator;
20pub use metering::{Metered, MeteredExt, MeteredSize};
21use strum::FromRepr;
22
23use crate::deep_size::DeepSize;
24
25pub type SeqNum = u64;
26pub type NonZeroSeqNum = std::num::NonZeroU64;
27pub type Timestamp = u64;
28
29#[derive(Debug, PartialEq, Eq, Clone, Copy)]
30pub struct StreamPosition {
31 pub seq_num: SeqNum,
32 pub timestamp: Timestamp,
33}
34
35impl StreamPosition {
36 pub const MIN: StreamPosition = StreamPosition {
37 seq_num: SeqNum::MIN,
38 timestamp: Timestamp::MIN,
39 };
40}
41
42impl std::fmt::Display for StreamPosition {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "{} @ {}", self.seq_num, self.timestamp)
45 }
46}
47
48impl DeepSize for StreamPosition {
49 fn deep_size(&self) -> usize {
50 self.seq_num.deep_size() + self.timestamp.deep_size()
51 }
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
55pub enum RecordDecodeError {
56 #[error("truncated: {0}")]
57 Truncated(&'static str),
58 #[error("invalid value [{0}]: {1}")]
59 InvalidValue(&'static str, &'static str),
60}
61
62#[derive(Debug, PartialEq, thiserror::Error)]
63pub enum RecordPartsError {
64 #[error("unknown command")]
65 UnknownCommand,
66 #[error("invalid `{0}` command: {1}")]
67 CommandPayload(CommandOp, CommandPayloadError),
68 #[error("invalid header: {0}")]
69 Header(#[from] HeaderValidationError),
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct Header {
74 pub name: Bytes,
75 pub value: Bytes,
76}
77
78impl DeepSize for Header {
79 fn deep_size(&self) -> usize {
80 self.name.len() + self.value.len()
81 }
82}
83
84#[derive(Clone, Copy, Debug, PartialEq, FromRepr)]
85#[repr(u8)]
86pub enum RecordType {
87 Command = 1,
88 Envelope = 2,
89 EncryptedEnvelope = 3,
90}
91
92#[derive(Copy, Clone, Debug, PartialEq)]
93pub struct MagicByte {
94 pub record_type: RecordType,
95 pub metered_size_varlen: u8,
96}
97
98fn read_vint_u32_be(bytes: &[u8]) -> u32 {
100 if bytes.len() > size_of::<u32>() || bytes.is_empty() {
101 panic!("invalid variable int bytes = {} len", bytes.len())
102 }
103 let mut acc: u32 = 0;
104 for &byte in bytes {
105 acc = (acc << 8) | byte as u32;
106 }
107 acc
108}
109
110pub fn try_metered_size(record_bytes: &[u8]) -> Result<u32, &'static str> {
111 let magic_byte_u8 = *record_bytes.first().ok_or("byte range is empty")?;
112 let magic_byte = MagicByte::try_from(magic_byte_u8)?;
113 Ok(read_vint_u32_be(
114 record_bytes
115 .get(1..1 + magic_byte.metered_size_varlen as usize)
116 .ok_or("byte range doesn't include bytes for metered size")?,
117 ))
118}
119
120impl MeteredSize for Record {
121 fn metered_size(&self) -> usize {
122 match self {
123 Self::Command(command) => command.metered_size(),
124 Self::Envelope(envelope) => envelope.metered_size(),
125 }
126 }
127}
128
129impl TryFrom<u8> for MagicByte {
130 type Error = &'static str;
131
132 fn try_from(value: u8) -> Result<Self, Self::Error> {
133 let record_type =
134 RecordType::from_repr(value & 0b111).ok_or("invalid record type ordinal")?;
135 Ok(Self {
136 record_type,
137 metered_size_varlen: match (value >> 3) & 0b11 {
138 0 => 1u8,
139 1 => 2u8,
140 2 => 3u8,
141 _ => Err("invalid metered_size_varlen")?,
142 },
143 })
144 }
145}
146
147impl From<MagicByte> for u8 {
148 fn from(value: MagicByte) -> Self {
149 ((value.metered_size_varlen - 1) << 3) | value.record_type as u8
150 }
151}
152
153#[derive(Debug, PartialEq, Eq, Clone)]
154pub enum Record {
155 Command(CommandRecord),
156 Envelope(EnvelopeRecord),
157}
158
159impl DeepSize for Record {
160 fn deep_size(&self) -> usize {
161 match self {
162 Self::Command(c) => c.deep_size(),
163 Self::Envelope(e) => e.deep_size(),
164 }
165 }
166}
167
168impl Record {
169 pub fn try_from_parts(headers: Vec<Header>, body: Bytes) -> Result<Self, RecordPartsError> {
170 if headers.len() == 1 {
171 let header = &headers[0];
172 if header.name.is_empty() {
173 let op = CommandOp::from_id(header.value.as_ref())
174 .ok_or(RecordPartsError::UnknownCommand)?;
175 let command_record = CommandRecord::try_from_parts(op, body.as_ref())
176 .map_err(|e| RecordPartsError::CommandPayload(op, e))?;
177 return Ok(Self::Command(command_record));
178 }
179 }
180 let envelope = EnvelopeRecord::try_from_parts(headers, body)?;
181 Ok(Self::Envelope(envelope))
182 }
183
184 pub fn sequenced(self, position: StreamPosition) -> SequencedRecord {
185 Sequenced::new(position, self)
186 }
187
188 pub fn into_parts(self) -> (Vec<Header>, Bytes) {
189 match self {
190 Record::Envelope(e) => e.into_parts(),
191 Record::Command(c) => {
192 let op = c.op();
193 let header = Header {
194 name: Bytes::new(),
195 value: Bytes::from_static(op.to_id()),
196 };
197 (vec![header], c.payload())
198 }
199 }
200 }
201}
202
203#[derive(Debug, PartialEq, Eq, Clone)]
204pub enum StoredRecord {
205 Plaintext(Record),
206 Encrypted {
207 metered_size: usize,
208 record: EncryptedRecord,
209 },
210}
211
212impl StoredRecord {
213 pub(crate) fn encrypted(record: EncryptedRecord, metered_size: usize) -> Self {
214 Self::Encrypted {
215 metered_size,
216 record,
217 }
218 }
219
220 fn record_type(&self) -> RecordType {
221 match self {
222 Self::Plaintext(Record::Command(_)) => RecordType::Command,
223 Self::Plaintext(Record::Envelope(_)) => RecordType::Envelope,
224 Self::Encrypted { .. } => RecordType::EncryptedEnvelope,
225 }
226 }
227
228 fn encoded_body_size(&self) -> usize {
229 match self {
230 Self::Plaintext(Record::Command(record)) => record.encoded_size(),
231 Self::Plaintext(Record::Envelope(record)) => record.encoded_size(),
232 Self::Encrypted { record, .. } => record.encoded_size(),
233 }
234 }
235
236 fn encode_body_into(&self, buf: &mut impl BufMut) {
237 match self {
238 Self::Plaintext(Record::Command(record)) => record.encode_into(buf),
239 Self::Plaintext(Record::Envelope(record)) => record.encode_into(buf),
240 Self::Encrypted { record, .. } => record.encode_into(buf),
241 }
242 }
243
244 pub fn encryption_algorithm(&self) -> Option<crate::encryption::EncryptionAlgorithm> {
245 match self {
246 Self::Plaintext(_) => None,
247 Self::Encrypted { record, .. } => Some(record.algorithm()),
248 }
249 }
250
251 pub fn max_assignable_seq_num(&self) -> SeqNum {
252 match self {
253 Self::Plaintext(_) => SeqNum::MAX,
254 Self::Encrypted { record, .. } => record.max_assignable_seq_num(),
255 }
256 }
257}
258
259impl DeepSize for StoredRecord {
260 fn deep_size(&self) -> usize {
261 match self {
262 Self::Plaintext(record) => record.deep_size(),
263 Self::Encrypted {
264 metered_size,
265 record,
266 } => metered_size.deep_size() + record.deep_size(),
267 }
268 }
269}
270
271impl MeteredSize for StoredRecord {
272 fn metered_size(&self) -> usize {
273 match self {
274 Self::Plaintext(record) => record.metered_size(),
275 Self::Encrypted { metered_size, .. } => *metered_size,
276 }
277 }
278}
279
280impl From<Record> for StoredRecord {
281 fn from(value: Record) -> Self {
282 Self::Plaintext(value)
283 }
284}
285
286impl From<Record> for Metered<StoredRecord> {
287 fn from(value: Record) -> Self {
288 Self::from(StoredRecord::from(value))
289 }
290}
291
292pub fn decode_if_command_record(record: &[u8]) -> Result<Option<CommandRecord>, RecordDecodeError> {
293 if record.is_empty() {
294 return Err(RecordDecodeError::Truncated("MagicByte"));
295 }
296 let magic_byte = MagicByte::try_from(record[0])
297 .map_err(|msg| RecordDecodeError::InvalidValue("MagicByte", msg))?;
298 match magic_byte.record_type {
299 RecordType::Command => {
300 let offset = 1 + magic_byte.metered_size_varlen as usize;
301 if record.len() < offset {
302 return Err(RecordDecodeError::Truncated("MeteredSize"));
303 }
304 Ok(Some(CommandRecord::try_from(&record[offset..])?))
305 }
306 RecordType::Envelope | RecordType::EncryptedEnvelope => Ok(None),
307 }
308}
309
310pub trait Encodable {
311 fn to_bytes(&self) -> Bytes {
312 let expected_size = self.encoded_size();
313 let mut buf = BytesMut::with_capacity(expected_size);
314 self.encode_into(&mut buf);
315 assert_eq!(buf.len(), expected_size, "no reallocation");
316 buf.freeze()
317 }
318
319 fn encoded_size(&self) -> usize;
320
321 fn encode_into(&self, buf: &mut impl BufMut);
322}
323
324impl Encodable for Metered<&StoredRecord> {
325 fn encoded_size(&self) -> usize {
326 1 + self.magic_byte().metered_size_varlen as usize + self.encoded_body_size()
327 }
328
329 fn encode_into(&self, buf: &mut impl BufMut) {
330 let magic_byte = self.magic_byte();
331 buf.put_u8(magic_byte.into());
332 buf.put_uint(
333 self.metered_size() as u64,
334 magic_byte.metered_size_varlen as usize,
335 );
336 self.encode_body_into(buf);
337 }
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub struct Sequenced<T> {
342 position: StreamPosition,
343 inner: T,
344}
345
346impl<T> Sequenced<T> {
347 pub const fn new(position: StreamPosition, inner: T) -> Self {
348 Self { position, inner }
349 }
350
351 pub const fn position(&self) -> &StreamPosition {
352 &self.position
353 }
354
355 pub fn inner(&self) -> &T {
356 &self.inner
357 }
358
359 pub fn as_ref(&self) -> Sequenced<&T> {
360 Sequenced::new(self.position, &self.inner)
361 }
362
363 pub fn parts(&self) -> (StreamPosition, &T) {
364 (self.position, &self.inner)
365 }
366
367 pub fn into_parts(self) -> (StreamPosition, T) {
368 (self.position, self.inner)
369 }
370}
371
372pub type StoredSequencedBytes = Sequenced<Bytes>;
373pub type SequencedRecord = Sequenced<Record>;
374pub type StoredSequencedRecord = Sequenced<StoredRecord>;
375
376impl<T> MeteredSize for Sequenced<T>
377where
378 T: MeteredSize,
379{
380 fn metered_size(&self) -> usize {
381 self.inner.metered_size()
382 }
383}
384
385impl<T> DeepSize for Sequenced<T>
386where
387 T: DeepSize,
388{
389 fn deep_size(&self) -> usize {
390 self.position.deep_size() + self.inner.deep_size()
391 }
392}
393
394impl<T> Metered<T>
395where
396 T: MeteredSize,
397{
398 pub fn sequenced(self, position: StreamPosition) -> Metered<Sequenced<T>> {
399 Metered::with_size(
400 self.metered_size(),
401 Sequenced::new(position, self.into_inner()),
402 )
403 }
404}
405
406impl Metered<&StoredRecord> {
407 fn magic_byte(&self) -> MagicByte {
408 let metered_size = self.metered_size();
409 let metered_size_varlen = 8 - (metered_size.leading_zeros() / 8) as u8;
410 if metered_size_varlen > 3 {
411 panic!("illegal metered size varlen {metered_size} for record")
412 }
413 MagicByte {
414 record_type: self.record_type(),
415 metered_size_varlen,
416 }
417 }
418}
419
420impl TryFrom<Bytes> for Metered<StoredRecord> {
421 type Error = RecordDecodeError;
422
423 fn try_from(mut buf: Bytes) -> Result<Self, Self::Error> {
424 if buf.is_empty() {
425 return Err(RecordDecodeError::Truncated("MagicByte"));
426 }
427 let magic_byte = MagicByte::try_from(buf.get_u8())
428 .map_err(|msg| RecordDecodeError::InvalidValue("MagicByte", msg))?;
429
430 let metered_size =
431 buf.try_get_uint(magic_byte.metered_size_varlen as usize)
432 .map_err(|_| RecordDecodeError::Truncated("MeteredSize"))? as usize;
433
434 Ok(Self::with_size(
435 metered_size,
436 match magic_byte.record_type {
437 RecordType::Command => {
438 StoredRecord::Plaintext(Record::Command(CommandRecord::try_from(buf.as_ref())?))
439 }
440 RecordType::Envelope => {
441 StoredRecord::Plaintext(Record::Envelope(EnvelopeRecord::try_from(buf)?))
442 }
443 RecordType::EncryptedEnvelope => {
444 StoredRecord::encrypted(EncryptedRecord::try_from(buf)?, metered_size)
445 }
446 },
447 ))
448 }
449}
450
451impl TryFrom<Bytes> for Metered<Record> {
452 type Error = RecordDecodeError;
453
454 fn try_from(buf: Bytes) -> Result<Self, Self::Error> {
455 let stored: Metered<StoredRecord> = buf.try_into()?;
456 let size = stored.metered_size();
457 match stored.into_inner() {
458 StoredRecord::Plaintext(record) => Ok(record),
459 StoredRecord::Encrypted { .. } => Err(RecordDecodeError::InvalidValue(
460 "RecordType",
461 "encrypted envelope requires decryption",
462 )),
463 }
464 .map(|record| Metered::with_size(size, record))
465 }
466}
467
468impl<T> Metered<Sequenced<T>> {
469 pub fn parts(&self) -> (StreamPosition, Metered<&T>) {
470 let size = self.metered_size();
471 let (position, inner) = self.as_ref().into_inner().parts();
472 (position, Metered::with_size(size, inner))
473 }
474
475 pub fn into_parts(self) -> (StreamPosition, Metered<T>) {
476 let size = self.metered_size();
477 let (position, inner) = self.into_inner().into_parts();
478 (position, Metered::with_size(size, inner))
479 }
480}
481
482#[cfg(test)]
483mod test {
484 use proptest::prelude::*;
485 use rstest::rstest;
486
487 use super::*;
488
489 struct LegacyPlaintextFrame<'a> {
490 record: &'a Record,
491 }
492
493 impl LegacyPlaintextFrame<'_> {
494 fn magic_byte(&self) -> MagicByte {
495 let metered_size = self.record.metered_size();
496 let metered_size_varlen = 8 - (metered_size.leading_zeros() / 8) as u8;
497 assert!(metered_size_varlen <= 3);
498
499 MagicByte {
500 record_type: match self.record {
501 Record::Command(_) => RecordType::Command,
502 Record::Envelope(_) => RecordType::Envelope,
503 },
504 metered_size_varlen,
505 }
506 }
507 }
508
509 impl Encodable for LegacyPlaintextFrame<'_> {
510 fn encoded_size(&self) -> usize {
511 let body_size = match self.record {
512 Record::Command(record) => record.encoded_size(),
513 Record::Envelope(record) => record.encoded_size(),
514 };
515 1 + self.magic_byte().metered_size_varlen as usize + body_size
516 }
517
518 fn encode_into(&self, buf: &mut impl BufMut) {
519 let magic_byte = self.magic_byte();
520 buf.put_u8(magic_byte.into());
521 buf.put_uint(
522 self.record.metered_size() as u64,
523 magic_byte.metered_size_varlen as usize,
524 );
525 match self.record {
526 Record::Command(record) => record.encode_into(buf),
527 Record::Envelope(record) => record.encode_into(buf),
528 }
529 }
530 }
531
532 fn legacy_plaintext_bytes(record: &Record) -> Bytes {
533 LegacyPlaintextFrame { record }.to_bytes()
534 }
535
536 fn semantic_metered_size(record: &Record) -> usize {
537 let (headers, body) = record.clone().into_parts();
538 8 + (2 * headers.len())
539 + headers
540 .iter()
541 .map(|header| header.name.len() + header.value.len())
542 .sum::<usize>()
543 + body.len()
544 }
545
546 fn bytes_strategy(allow_empty: bool) -> impl Strategy<Value = Bytes> {
547 prop_oneof![
548 prop::collection::vec(any::<u8>(), (if allow_empty { 0 } else { 1 })..10)
549 .prop_map(Bytes::from),
550 prop::collection::vec(any::<u8>(), 100..1000).prop_map(Bytes::from),
551 ]
552 }
553
554 fn header_strategy() -> impl Strategy<Value = Header> {
555 (bytes_strategy(false), bytes_strategy(true))
556 .prop_map(|(name, value)| Header { name, value })
557 }
558
559 fn headers_strategy() -> impl Strategy<Value = Vec<Header>> {
560 prop_oneof![
561 prop::collection::vec(header_strategy(), 0..10),
562 prop::collection::vec(header_strategy(), 200..300),
563 ]
564 }
565
566 fn command_strategy() -> impl Strategy<Value = CommandRecord> {
567 prop_oneof![
568 proptest::string::string_regex(&format!("[ -~]{{0,{MAX_FENCING_TOKEN_LENGTH}}}"))
569 .unwrap()
570 .prop_map(|token| CommandRecord::Fence(token.parse().unwrap())),
571 any::<SeqNum>().prop_map(CommandRecord::Trim),
572 ]
573 }
574
575 proptest!(
576 #![proptest_config(ProptestConfig::with_cases(10))]
577 #[test]
578 fn roundtrip_envelope(
579 seq_num in any::<SeqNum>(),
580 timestamp in any::<Timestamp>(),
581 headers in headers_strategy(),
582 body in bytes_strategy(true),
583 ) {
584 let record = Record::try_from_parts(headers, body).unwrap();
585 let metered_record: Metered<Record> = record.clone().into();
586 let encoded_record = Metered::from(StoredRecord::from(record.clone()))
587 .as_ref()
588 .to_bytes();
589 let legacy_record = legacy_plaintext_bytes(&record);
590 prop_assert_eq!(encoded_record.as_ref(), legacy_record.as_ref());
591 let decoded_record = Metered::try_from(encoded_record).unwrap();
592 prop_assert_eq!(&decoded_record, &metered_record);
593 let sequenced = decoded_record.sequenced(StreamPosition { seq_num, timestamp });
594 let (position, sequenced_record) = sequenced.into_parts();
595 assert_eq!(position, StreamPosition { seq_num, timestamp });
596 assert_eq!(sequenced_record.into_inner(), record);
597 }
598 );
599
600 proptest!(
601 #![proptest_config(ProptestConfig::with_cases(10))]
602 #[test]
603 fn roundtrip_metered(
604 headers in headers_strategy(),
605 body in bytes_strategy(true),
606 ) {
607 let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
608 let encoded_record = Metered::from(StoredRecord::from(record.clone()))
609 .as_ref()
610 .to_bytes();
611 assert_eq!(record.metered_size(), semantic_metered_size(&record));
612 assert_eq!(record.metered_size(), try_metered_size(encoded_record.as_ref()).unwrap() as usize);
613 }
614 );
615
616 proptest!(
617 #![proptest_config(ProptestConfig::with_cases(10))]
618 #[test]
619 fn roundtrip_command_metered(command in command_strategy()) {
620 let record = Record::Command(command);
621 let encoded_record = Metered::from(StoredRecord::from(record.clone()))
622 .as_ref()
623 .to_bytes();
624 let expected_metered = semantic_metered_size(&record);
625 let wire_metered = try_metered_size(encoded_record.as_ref()).unwrap() as usize;
626 let decoded_record: Metered<Record> = Metered::try_from(encoded_record).unwrap();
627
628 assert_eq!(record.metered_size(), expected_metered);
629 assert_eq!(record.metered_size(), wire_metered);
630 prop_assert_eq!(decoded_record, Metered::<Record>::from(record));
631 }
632 );
633
634 #[test]
635 fn roundtrip_encrypted_stored_record() {
636 let mut encoded = BytesMut::with_capacity(1 + 12 + 10 + 16);
637 encoded.put_u8(0x02);
638 encoded.put_slice(b"0123456789ab");
639 encoded.put_slice(b"ciphertext");
640 encoded.put_slice(b"0123456789abcdef");
641 let record =
642 StoredRecord::encrypted(EncryptedRecord::try_from(encoded.freeze()).unwrap(), 123);
643 let metered_record: Metered<StoredRecord> = record.clone().into();
644 let encoded_record = metered_record.as_ref().to_bytes();
645 let decoded_record = Metered::try_from(encoded_record).unwrap();
646 assert_eq!(decoded_record, metered_record);
647 }
648
649 #[test]
650 fn empty_header_name_solo() {
651 let headers = vec![Header {
652 name: Bytes::new(),
653 value: Bytes::from("hi"),
654 }];
655 let body = Bytes::from("hello");
656 assert_eq!(
657 Record::try_from_parts(headers, body),
658 Err(RecordPartsError::UnknownCommand)
659 );
660 }
661
662 #[test]
663 fn empty_header_name_among_others() {
664 let headers = vec![
665 Header {
666 name: Bytes::from("boku"),
667 value: Bytes::from("hi"),
668 },
669 Header {
670 name: Bytes::new(),
671 value: Bytes::from("hi"),
672 },
673 ];
674 let body = Bytes::from("hello");
675 assert_eq!(
676 Record::try_from_parts(headers, body),
677 Err(RecordPartsError::Header(HeaderValidationError::NameEmpty))
678 );
679 }
680
681 fn command_parts(op: &'static [u8], payload: &'static [u8]) -> (Vec<Header>, Bytes) {
682 let headers = vec![Header {
683 name: Bytes::new(),
684 value: Bytes::from_static(op),
685 }];
686 let body = Bytes::from_static(payload);
687 (headers, body)
688 }
689
690 fn assert_valid_command_record(op: &'static [u8], payload: &'static [u8]) {
691 let (headers, body) = command_parts(op, payload);
692 let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
693 let record_metered = record.metered_size();
694 match &record {
695 Record::Command(cmd) => {
696 assert_eq!(cmd.op().to_id(), op);
697 assert_eq!(cmd.payload().as_ref(), payload);
698 }
699 other => panic!("Command expected, got {other:?}"),
700 }
701 let encoded_record = Metered::from(StoredRecord::from(record.clone()))
702 .as_ref()
703 .to_bytes();
704 assert_eq!(record_metered, semantic_metered_size(&record));
705 assert_eq!(
706 record_metered,
707 try_metered_size(encoded_record.as_ref()).unwrap() as usize
708 );
709 assert_eq!(
710 encoded_record.as_ref(),
711 legacy_plaintext_bytes(&record).as_ref()
712 );
713 let sequenced_record = record.clone().sequenced(StreamPosition {
714 seq_num: 42,
715 timestamp: 100_000,
716 });
717 let sequenced_metered = sequenced_record.metered_size();
718 assert_eq!(record_metered, sequenced_metered);
719 assert_eq!(
720 sequenced_record.position,
721 StreamPosition {
722 seq_num: 42,
723 timestamp: 100_000,
724 }
725 );
726 assert_eq!(
727 sequenced_record.inner,
728 Record::try_from_parts(headers, body).unwrap()
729 );
730 }
731
732 #[rstest]
733 #[case::fence_empty(b"fence", b"")]
734 #[case::fence_uuid(b"fence", b"my-special-uuid")]
735 #[case::trim_0(b"trim", b"\x00\x00\x00\x00\x00\x00\x00\x00")]
736 fn valid_command_records(#[case] op: &'static [u8], #[case] payload: &'static [u8]) {
737 assert_valid_command_record(op, payload);
738 }
739
740 #[rstest]
741 #[case::fence_too_long(
742 b"fence",
743 b"toolongtoolongtoolongtoolongtoolongtoolongtoolong",
744 RecordPartsError::CommandPayload(
745 CommandOp::Fence,
746 CommandPayloadError::FencingTokenTooLong(FencingTokenTooLongError(49)),
747 )
748 )]
749 #[case::trim_empty(
750 b"trim",
751 b"",
752 RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(0),)
753 )]
754 #[case::trim_overflow(
755 b"trim",
756 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00",
757 RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(9),)
758 )]
759 fn invalid_command_records(
760 #[case] op: &'static [u8],
761 #[case] payload: &'static [u8],
762 #[case] expected: RecordPartsError,
763 ) {
764 let (headers, body) = command_parts(op, payload);
765 assert_eq!(Record::try_from_parts(headers, body), Err(expected));
766 }
767
768 #[rstest]
769 #[case(0b0000_0010, MagicByte { record_type: RecordType::Envelope, metered_size_varlen: 1})]
770 #[case(0b0001_0010, MagicByte { record_type: RecordType::Envelope, metered_size_varlen: 3})]
771 #[case(0b0000_0011, MagicByte { record_type: RecordType::EncryptedEnvelope, metered_size_varlen: 1})]
772 #[case(0b0000_1001, MagicByte { record_type: RecordType::Command, metered_size_varlen: 2})]
773 fn valid_magic_byte_parsing(#[case] as_u8: u8, #[case] magic_byte: MagicByte) {
774 assert_eq!(MagicByte::try_from(as_u8).unwrap(), magic_byte);
775 assert_eq!(u8::from(magic_byte), as_u8);
776 }
777
778 #[rstest]
779 #[case(0b0000_1101, "invalid record type ordinal")]
780 #[case(0b0001_1001, "invalid metered_size_varlen")]
781 fn invalid_magic_byte_parsing(#[case] as_u8: u8, #[case] expected: &'static str) {
782 assert_eq!(MagicByte::try_from(as_u8), Err(expected));
783 }
784
785 #[test]
786 fn metered_record_truncated_after_magic_byte_returns_error() {
787 let truncated = Bytes::from_static(&[0b0000_0010]);
789 let result: Result<Metered<Record>, _> = truncated.try_into();
790 assert_eq!(result, Err(RecordDecodeError::Truncated("MeteredSize")));
791 }
792
793 #[test]
794 fn test_read_varint() {
795 let data = [0u8, 0, 0, 1, 0, 0, 0];
796
797 assert_eq!(read_vint_u32_be(&data[..4]), 1u32);
798 assert_eq!(read_vint_u32_be(&data[2..5]), 2u32.pow(8));
799 assert_eq!(read_vint_u32_be(&data[2..6]), 2u32.pow(16));
800 assert_eq!(read_vint_u32_be(&data[3..]), 2u32.pow(24));
801 }
802}