Skip to main content

s2_common/record/
mod.rs

1mod command;
2mod envelope;
3mod fencing;
4mod metering;
5
6use bytes::Bytes;
7pub use command::{CommandOp, CommandPayloadError, CommandRecord};
8pub use envelope::{EnvelopeRecord, HeaderValidationError};
9pub use fencing::{FencingToken, FencingTokenTooLongError, MAX_FENCING_TOKEN_LENGTH};
10pub use metering::{Metered, MeteredExt, MeteredSize};
11
12use crate::deep_size::DeepSize;
13
14pub type SeqNum = u64;
15pub type NonZeroSeqNum = std::num::NonZeroU64;
16pub type Timestamp = u64;
17
18#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub struct StreamPosition {
20    pub seq_num: SeqNum,
21    pub timestamp: Timestamp,
22}
23
24impl StreamPosition {
25    pub const MIN: StreamPosition = StreamPosition {
26        seq_num: SeqNum::MIN,
27        timestamp: Timestamp::MIN,
28    };
29}
30
31impl std::fmt::Display for StreamPosition {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(f, "{} @ {}", self.seq_num, self.timestamp)
34    }
35}
36
37impl DeepSize for StreamPosition {
38    fn deep_size(&self) -> usize {
39        self.seq_num.deep_size() + self.timestamp.deep_size()
40    }
41}
42
43#[derive(Debug, PartialEq, thiserror::Error)]
44pub enum RecordPartsError {
45    #[error("unknown command")]
46    UnknownCommand,
47    #[error("invalid `{0}` command: {1}")]
48    CommandPayload(CommandOp, CommandPayloadError),
49    #[error("invalid header: {0}")]
50    Header(#[from] HeaderValidationError),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct Header {
55    pub name: Bytes,
56    pub value: Bytes,
57}
58
59impl DeepSize for Header {
60    fn deep_size(&self) -> usize {
61        self.name.len() + self.value.len()
62    }
63}
64
65impl MeteredSize for Record {
66    fn metered_size(&self) -> usize {
67        match self {
68            Self::Command(command) => command.metered_size(),
69            Self::Envelope(envelope) => envelope.metered_size(),
70        }
71    }
72}
73
74#[derive(Debug, PartialEq, Eq, Clone)]
75pub enum Record {
76    Command(CommandRecord),
77    Envelope(EnvelopeRecord),
78}
79
80impl DeepSize for Record {
81    fn deep_size(&self) -> usize {
82        match self {
83            Self::Command(c) => c.deep_size(),
84            Self::Envelope(e) => e.deep_size(),
85        }
86    }
87}
88
89impl Record {
90    pub fn try_from_parts(headers: Vec<Header>, body: Bytes) -> Result<Self, RecordPartsError> {
91        if headers.len() == 1 {
92            let header = &headers[0];
93            if header.name.is_empty() {
94                let op = CommandOp::from_id(header.value.as_ref())
95                    .ok_or(RecordPartsError::UnknownCommand)?;
96                let command_record = CommandRecord::try_from_parts(op, body.as_ref())
97                    .map_err(|e| RecordPartsError::CommandPayload(op, e))?;
98                return Ok(Self::Command(command_record));
99            }
100        }
101        let envelope = EnvelopeRecord::try_from_parts(headers, body)?;
102        Ok(Self::Envelope(envelope))
103    }
104
105    pub fn sequenced(self, position: StreamPosition) -> SequencedRecord {
106        Sequenced::new(position, self)
107    }
108
109    pub fn into_parts(self) -> (Vec<Header>, Bytes) {
110        match self {
111            Record::Envelope(e) => e.into_parts(),
112            Record::Command(c) => {
113                let op = c.op();
114                let header = Header {
115                    name: Bytes::new(),
116                    value: Bytes::from_static(op.to_id()),
117                };
118                (vec![header], c.payload())
119            }
120        }
121    }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct Sequenced<T> {
126    position: StreamPosition,
127    inner: T,
128}
129
130impl<T> Sequenced<T> {
131    pub const fn new(position: StreamPosition, inner: T) -> Self {
132        Self { position, inner }
133    }
134
135    pub const fn position(&self) -> &StreamPosition {
136        &self.position
137    }
138
139    pub fn inner(&self) -> &T {
140        &self.inner
141    }
142
143    pub fn as_ref(&self) -> Sequenced<&T> {
144        Sequenced::new(self.position, &self.inner)
145    }
146
147    pub fn parts(&self) -> (StreamPosition, &T) {
148        (self.position, &self.inner)
149    }
150
151    pub fn into_parts(self) -> (StreamPosition, T) {
152        (self.position, self.inner)
153    }
154}
155
156pub type SequencedRecord = Sequenced<Record>;
157
158impl<T> MeteredSize for Sequenced<T>
159where
160    T: MeteredSize,
161{
162    fn metered_size(&self) -> usize {
163        self.inner.metered_size()
164    }
165}
166
167impl<T> DeepSize for Sequenced<T>
168where
169    T: DeepSize,
170{
171    fn deep_size(&self) -> usize {
172        self.position.deep_size() + self.inner.deep_size()
173    }
174}
175
176impl<T> Metered<T>
177where
178    T: MeteredSize,
179{
180    pub fn sequenced(self, position: StreamPosition) -> Metered<Sequenced<T>> {
181        Metered::with_size(
182            self.metered_size(),
183            Sequenced::new(position, self.into_inner()),
184        )
185    }
186}
187
188impl<T> Metered<Sequenced<T>> {
189    pub fn parts(&self) -> (StreamPosition, Metered<&T>) {
190        let size = self.metered_size();
191        let (position, inner) = self.as_ref().into_inner().parts();
192        (position, Metered::with_size(size, inner))
193    }
194
195    pub fn into_parts(self) -> (StreamPosition, Metered<T>) {
196        let size = self.metered_size();
197        let (position, inner) = self.into_inner().into_parts();
198        (position, Metered::with_size(size, inner))
199    }
200}
201
202#[cfg(test)]
203mod test {
204    use rstest::rstest;
205
206    use super::*;
207
208    fn semantic_metered_size(record: &Record) -> usize {
209        let (headers, body) = record.clone().into_parts();
210        8 + (2 * headers.len())
211            + headers
212                .iter()
213                .map(|header| header.name.len() + header.value.len())
214                .sum::<usize>()
215            + body.len()
216    }
217
218    #[test]
219    fn empty_header_name_solo() {
220        let headers = vec![Header {
221            name: Bytes::new(),
222            value: Bytes::from("hi"),
223        }];
224        let body = Bytes::from("hello");
225        assert_eq!(
226            Record::try_from_parts(headers, body),
227            Err(RecordPartsError::UnknownCommand)
228        );
229    }
230
231    #[test]
232    fn empty_header_name_among_others() {
233        let headers = vec![
234            Header {
235                name: Bytes::from("boku"),
236                value: Bytes::from("hi"),
237            },
238            Header {
239                name: Bytes::new(),
240                value: Bytes::from("hi"),
241            },
242        ];
243        let body = Bytes::from("hello");
244        assert_eq!(
245            Record::try_from_parts(headers, body),
246            Err(RecordPartsError::Header(HeaderValidationError::NameEmpty))
247        );
248    }
249
250    fn command_parts(op: &'static [u8], payload: &'static [u8]) -> (Vec<Header>, Bytes) {
251        let headers = vec![Header {
252            name: Bytes::new(),
253            value: Bytes::from_static(op),
254        }];
255        let body = Bytes::from_static(payload);
256        (headers, body)
257    }
258
259    fn assert_valid_command_record(op: &'static [u8], payload: &'static [u8]) {
260        let (headers, body) = command_parts(op, payload);
261        let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
262        let record_metered = record.metered_size();
263        match &record {
264            Record::Command(cmd) => {
265                assert_eq!(cmd.op().to_id(), op);
266                assert_eq!(cmd.payload().as_ref(), payload);
267            }
268            other => panic!("Command expected, got {other:?}"),
269        }
270        assert_eq!(record_metered, semantic_metered_size(&record));
271        let sequenced_record = record.clone().sequenced(StreamPosition {
272            seq_num: 42,
273            timestamp: 100_000,
274        });
275        let sequenced_metered = sequenced_record.metered_size();
276        assert_eq!(record_metered, sequenced_metered);
277        assert_eq!(
278            sequenced_record.position,
279            StreamPosition {
280                seq_num: 42,
281                timestamp: 100_000,
282            }
283        );
284        assert_eq!(
285            sequenced_record.inner,
286            Record::try_from_parts(headers, body).unwrap()
287        );
288    }
289
290    #[rstest]
291    #[case::fence_empty(b"fence", b"")]
292    #[case::fence_uuid(b"fence", b"my-special-uuid")]
293    #[case::trim_0(b"trim", b"\x00\x00\x00\x00\x00\x00\x00\x00")]
294    fn valid_command_records(#[case] op: &'static [u8], #[case] payload: &'static [u8]) {
295        assert_valid_command_record(op, payload);
296    }
297
298    #[rstest]
299    #[case::fence_too_long(
300        b"fence",
301        b"toolongtoolongtoolongtoolongtoolongtoolongtoolong",
302        RecordPartsError::CommandPayload(
303            CommandOp::Fence,
304            CommandPayloadError::FencingTokenTooLong(FencingTokenTooLongError(49)),
305        )
306    )]
307    #[case::trim_empty(
308        b"trim",
309        b"",
310        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(0),)
311    )]
312    #[case::trim_overflow(
313        b"trim",
314        b"\x00\x00\x00\x00\x00\x00\x00\x00\x00",
315        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(9),)
316    )]
317    fn invalid_command_records(
318        #[case] op: &'static [u8],
319        #[case] payload: &'static [u8],
320        #[case] expected: RecordPartsError,
321    ) {
322        let (headers, body) = command_parts(op, payload);
323        assert_eq!(Record::try_from_parts(headers, body), Err(expected));
324    }
325}