1mod command;
2mod envelope;
3mod fencing;
4mod metering;
5
6use bytes::Bytes;
7pub use command::{CommandOp, CommandPayloadError, CommandRecord};
8pub use envelope::{EnvelopeRecord, HeaderValidationError};
9pub use fencing::{FencingToken, FencingTokenTooLongError, MAX_FENCING_TOKEN_LENGTH};
10pub use metering::{Metered, MeteredExt, MeteredSize};
11
12use crate::deep_size::DeepSize;
13
14pub type SeqNum = u64;
15pub type NonZeroSeqNum = std::num::NonZeroU64;
16pub type Timestamp = u64;
17
18#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub struct StreamPosition {
20 pub seq_num: SeqNum,
21 pub timestamp: Timestamp,
22}
23
24impl StreamPosition {
25 pub const MIN: StreamPosition = StreamPosition {
26 seq_num: SeqNum::MIN,
27 timestamp: Timestamp::MIN,
28 };
29}
30
31impl std::fmt::Display for StreamPosition {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(f, "{} @ {}", self.seq_num, self.timestamp)
34 }
35}
36
37impl DeepSize for StreamPosition {
38 fn deep_size(&self) -> usize {
39 self.seq_num.deep_size() + self.timestamp.deep_size()
40 }
41}
42
43#[derive(Debug, PartialEq, thiserror::Error)]
44pub enum RecordPartsError {
45 #[error("unknown command")]
46 UnknownCommand,
47 #[error("invalid `{0}` command: {1}")]
48 CommandPayload(CommandOp, CommandPayloadError),
49 #[error("invalid header: {0}")]
50 Header(#[from] HeaderValidationError),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct Header {
55 pub name: Bytes,
56 pub value: Bytes,
57}
58
59impl DeepSize for Header {
60 fn deep_size(&self) -> usize {
61 self.name.len() + self.value.len()
62 }
63}
64
65impl MeteredSize for Record {
66 fn metered_size(&self) -> usize {
67 match self {
68 Self::Command(command) => command.metered_size(),
69 Self::Envelope(envelope) => envelope.metered_size(),
70 }
71 }
72}
73
74#[derive(Debug, PartialEq, Eq, Clone)]
75pub enum Record {
76 Command(CommandRecord),
77 Envelope(EnvelopeRecord),
78}
79
80impl DeepSize for Record {
81 fn deep_size(&self) -> usize {
82 match self {
83 Self::Command(c) => c.deep_size(),
84 Self::Envelope(e) => e.deep_size(),
85 }
86 }
87}
88
89impl Record {
90 pub fn try_from_parts(headers: Vec<Header>, body: Bytes) -> Result<Self, RecordPartsError> {
91 if headers.len() == 1 {
92 let header = &headers[0];
93 if header.name.is_empty() {
94 let op = CommandOp::from_id(header.value.as_ref())
95 .ok_or(RecordPartsError::UnknownCommand)?;
96 let command_record = CommandRecord::try_from_parts(op, body.as_ref())
97 .map_err(|e| RecordPartsError::CommandPayload(op, e))?;
98 return Ok(Self::Command(command_record));
99 }
100 }
101 let envelope = EnvelopeRecord::try_from_parts(headers, body)?;
102 Ok(Self::Envelope(envelope))
103 }
104
105 pub fn sequenced(self, position: StreamPosition) -> SequencedRecord {
106 Sequenced::new(position, self)
107 }
108
109 pub fn into_parts(self) -> (Vec<Header>, Bytes) {
110 match self {
111 Record::Envelope(e) => e.into_parts(),
112 Record::Command(c) => {
113 let op = c.op();
114 let header = Header {
115 name: Bytes::new(),
116 value: Bytes::from_static(op.to_id()),
117 };
118 (vec![header], c.payload())
119 }
120 }
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct Sequenced<T> {
126 position: StreamPosition,
127 inner: T,
128}
129
130impl<T> Sequenced<T> {
131 pub const fn new(position: StreamPosition, inner: T) -> Self {
132 Self { position, inner }
133 }
134
135 pub const fn position(&self) -> &StreamPosition {
136 &self.position
137 }
138
139 pub fn inner(&self) -> &T {
140 &self.inner
141 }
142
143 pub fn as_ref(&self) -> Sequenced<&T> {
144 Sequenced::new(self.position, &self.inner)
145 }
146
147 pub fn parts(&self) -> (StreamPosition, &T) {
148 (self.position, &self.inner)
149 }
150
151 pub fn into_parts(self) -> (StreamPosition, T) {
152 (self.position, self.inner)
153 }
154}
155
156pub type SequencedRecord = Sequenced<Record>;
157
158impl<T> MeteredSize for Sequenced<T>
159where
160 T: MeteredSize,
161{
162 fn metered_size(&self) -> usize {
163 self.inner.metered_size()
164 }
165}
166
167impl<T> DeepSize for Sequenced<T>
168where
169 T: DeepSize,
170{
171 fn deep_size(&self) -> usize {
172 self.position.deep_size() + self.inner.deep_size()
173 }
174}
175
176impl<T> Metered<T>
177where
178 T: MeteredSize,
179{
180 pub fn sequenced(self, position: StreamPosition) -> Metered<Sequenced<T>> {
181 Metered::with_size(
182 self.metered_size(),
183 Sequenced::new(position, self.into_inner()),
184 )
185 }
186}
187
188impl<T> Metered<Sequenced<T>> {
189 pub fn parts(&self) -> (StreamPosition, Metered<&T>) {
190 let size = self.metered_size();
191 let (position, inner) = self.as_ref().into_inner().parts();
192 (position, Metered::with_size(size, inner))
193 }
194
195 pub fn into_parts(self) -> (StreamPosition, Metered<T>) {
196 let size = self.metered_size();
197 let (position, inner) = self.into_inner().into_parts();
198 (position, Metered::with_size(size, inner))
199 }
200}
201
202#[cfg(test)]
203mod test {
204 use rstest::rstest;
205
206 use super::*;
207
208 fn semantic_metered_size(record: &Record) -> usize {
209 let (headers, body) = record.clone().into_parts();
210 8 + (2 * headers.len())
211 + headers
212 .iter()
213 .map(|header| header.name.len() + header.value.len())
214 .sum::<usize>()
215 + body.len()
216 }
217
218 #[test]
219 fn empty_header_name_solo() {
220 let headers = vec![Header {
221 name: Bytes::new(),
222 value: Bytes::from("hi"),
223 }];
224 let body = Bytes::from("hello");
225 assert_eq!(
226 Record::try_from_parts(headers, body),
227 Err(RecordPartsError::UnknownCommand)
228 );
229 }
230
231 #[test]
232 fn empty_header_name_among_others() {
233 let headers = vec![
234 Header {
235 name: Bytes::from("boku"),
236 value: Bytes::from("hi"),
237 },
238 Header {
239 name: Bytes::new(),
240 value: Bytes::from("hi"),
241 },
242 ];
243 let body = Bytes::from("hello");
244 assert_eq!(
245 Record::try_from_parts(headers, body),
246 Err(RecordPartsError::Header(HeaderValidationError::NameEmpty))
247 );
248 }
249
250 fn command_parts(op: &'static [u8], payload: &'static [u8]) -> (Vec<Header>, Bytes) {
251 let headers = vec![Header {
252 name: Bytes::new(),
253 value: Bytes::from_static(op),
254 }];
255 let body = Bytes::from_static(payload);
256 (headers, body)
257 }
258
259 fn assert_valid_command_record(op: &'static [u8], payload: &'static [u8]) {
260 let (headers, body) = command_parts(op, payload);
261 let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
262 let record_metered = record.metered_size();
263 match &record {
264 Record::Command(cmd) => {
265 assert_eq!(cmd.op().to_id(), op);
266 assert_eq!(cmd.payload().as_ref(), payload);
267 }
268 other => panic!("Command expected, got {other:?}"),
269 }
270 assert_eq!(record_metered, semantic_metered_size(&record));
271 let sequenced_record = record.clone().sequenced(StreamPosition {
272 seq_num: 42,
273 timestamp: 100_000,
274 });
275 let sequenced_metered = sequenced_record.metered_size();
276 assert_eq!(record_metered, sequenced_metered);
277 assert_eq!(
278 sequenced_record.position,
279 StreamPosition {
280 seq_num: 42,
281 timestamp: 100_000,
282 }
283 );
284 assert_eq!(
285 sequenced_record.inner,
286 Record::try_from_parts(headers, body).unwrap()
287 );
288 }
289
290 #[rstest]
291 #[case::fence_empty(b"fence", b"")]
292 #[case::fence_uuid(b"fence", b"my-special-uuid")]
293 #[case::trim_0(b"trim", b"\x00\x00\x00\x00\x00\x00\x00\x00")]
294 fn valid_command_records(#[case] op: &'static [u8], #[case] payload: &'static [u8]) {
295 assert_valid_command_record(op, payload);
296 }
297
298 #[rstest]
299 #[case::fence_too_long(
300 b"fence",
301 b"toolongtoolongtoolongtoolongtoolongtoolongtoolong",
302 RecordPartsError::CommandPayload(
303 CommandOp::Fence,
304 CommandPayloadError::FencingTokenTooLong(FencingTokenTooLongError(49)),
305 )
306 )]
307 #[case::trim_empty(
308 b"trim",
309 b"",
310 RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(0),)
311 )]
312 #[case::trim_overflow(
313 b"trim",
314 b"\x00\x00\x00\x00\x00\x00\x00\x00\x00",
315 RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(9),)
316 )]
317 fn invalid_command_records(
318 #[case] op: &'static [u8],
319 #[case] payload: &'static [u8],
320 #[case] expected: RecordPartsError,
321 ) {
322 let (headers, body) = command_parts(op, payload);
323 assert_eq!(Record::try_from_parts(headers, body), Err(expected));
324 }
325}