Skip to main content

s2_common/record/
command.rs

1use std::{fmt, str::Utf8Error};
2
3use bytes::Bytes;
4use compact_str::CompactString;
5
6use super::{FencingTokenTooLongError, MeteredSize, fencing::FencingToken};
7use crate::{deep_size::DeepSize, record::SeqNum};
8
9#[derive(Debug, PartialEq, Eq, Clone, Copy)]
10pub enum CommandOp {
11    Fence,
12    Trim,
13}
14
15impl CommandOp {
16    pub fn to_id(self) -> &'static [u8] {
17        match self {
18            Self::Fence => b"fence",
19            Self::Trim => b"trim",
20        }
21    }
22
23    pub fn from_id(name: &[u8]) -> Option<Self> {
24        match name {
25            b"fence" => Some(Self::Fence),
26            b"trim" => Some(Self::Trim),
27            _ => None,
28        }
29    }
30}
31
32impl fmt::Display for CommandOp {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        let name = std::str::from_utf8(self.to_id()).map_err(|_| fmt::Error)?;
35        f.write_str(name)
36    }
37}
38
39#[derive(Debug, PartialEq, Eq, Clone)]
40pub enum CommandRecord {
41    Fence(FencingToken),
42    Trim(SeqNum),
43}
44
45impl DeepSize for CommandRecord {
46    fn deep_size(&self) -> usize {
47        match self {
48            Self::Fence(token) => token.deep_size(),
49            Self::Trim(seq_num) => seq_num.deep_size(),
50        }
51    }
52}
53
54impl MeteredSize for CommandRecord {
55    fn metered_size(&self) -> usize {
56        8 + 2
57            + self.op().to_id().len()
58            + match self {
59                Self::Fence(token) => token.len(),
60                Self::Trim(trim_point) => size_of_val(trim_point),
61            }
62    }
63}
64
65impl CommandRecord {
66    pub fn op(&self) -> CommandOp {
67        match self {
68            CommandRecord::Fence(_) => CommandOp::Fence,
69            CommandRecord::Trim(_) => CommandOp::Trim,
70        }
71    }
72
73    pub fn payload(&self) -> Bytes {
74        match self {
75            Self::Fence(token) => Bytes::copy_from_slice(token.as_bytes()),
76            Self::Trim(trim_point) => Bytes::copy_from_slice(&trim_point.to_be_bytes()),
77        }
78    }
79
80    pub fn try_from_parts(op: CommandOp, payload: &[u8]) -> Result<Self, CommandPayloadError> {
81        match op {
82            CommandOp::Fence => {
83                let token = CompactString::from_utf8(payload)
84                    .map_err(CommandPayloadError::InvalidUtf8)?
85                    .try_into()?;
86                Ok(Self::Fence(token))
87            }
88            CommandOp::Trim => {
89                let trim_point = SeqNum::from_be_bytes(
90                    payload
91                        .try_into()
92                        .map_err(|_| CommandPayloadError::TrimPointSize(payload.len()))?,
93                );
94                Ok(Self::Trim(trim_point))
95            }
96        }
97    }
98}
99
100#[derive(Debug, PartialEq, thiserror::Error)]
101pub enum CommandPayloadError {
102    #[error("invalid UTF-8")]
103    InvalidUtf8(Utf8Error),
104    #[error(transparent)]
105    FencingTokenTooLong(#[from] FencingTokenTooLongError),
106    #[error("earliest sequence number to trim to was {0} bytes, must be 8")]
107    TrimPointSize(usize),
108}
109
110#[cfg(test)]
111mod tests {
112    use compact_str::ToCompactString;
113    use proptest::prelude::*;
114    use rstest::rstest;
115
116    use super::*;
117
118    #[test]
119    fn command_op_names() {
120        for cmd in [CommandOp::Fence, CommandOp::Trim] {
121            let name = cmd.to_id();
122            assert_eq!(CommandOp::from_id(name), Some(cmd));
123        }
124        assert_eq!(CommandOp::from_id(b""), None);
125        assert_eq!(CommandOp::from_id(b"invalid"), None);
126    }
127
128    #[test]
129    fn fencing_token_invalid_utf8() {
130        assert!(matches!(
131            CommandRecord::try_from_parts(CommandOp::Fence, &[0xff]),
132            Err(CommandPayloadError::InvalidUtf8(_))
133        ));
134    }
135
136    #[test]
137    fn fencing_token_too_long() {
138        assert_eq!(
139            CommandRecord::try_from_parts(
140                CommandOp::Fence,
141                b"0123456789012345678901234567890123456789"
142            ),
143            Err(CommandPayloadError::FencingTokenTooLong(
144                FencingTokenTooLongError(40)
145            ))
146        );
147    }
148
149    #[rstest]
150    #[case::empty("")]
151    #[case::arbit("arbitrary")]
152    #[case::full("0123456789012345")]
153    fn fence_roundtrip(#[case] token: &str) {
154        let cmd = CommandRecord::Fence(FencingToken::try_from(token.to_compact_string()).unwrap());
155        assert_eq!(
156            CommandRecord::try_from_parts(CommandOp::Fence, token.as_bytes()),
157            Ok(cmd.clone())
158        );
159        assert_eq!(
160            cmd.metered_size(),
161            8 + 2 + CommandOp::Fence.to_id().len() + token.len()
162        );
163    }
164
165    #[rstest]
166    #[case::empty(b"")]
167    #[case::too_small(b"0123")]
168    #[case::too_big(b"0123456789")]
169    fn trim_point_size(#[case] payload: &[u8]) {
170        assert_eq!(
171            CommandRecord::try_from_parts(CommandOp::Trim, payload),
172            Err(CommandPayloadError::TrimPointSize(payload.len()))
173        );
174    }
175
176    #[test]
177    fn metered_size_is_computed_without_materializing_payload() {
178        let fence =
179            CommandRecord::Fence(FencingToken::try_from("fence-me".to_compact_string()).unwrap());
180        assert_eq!(
181            fence.metered_size(),
182            8 + 2 + CommandOp::Fence.to_id().len() + "fence-me".len()
183        );
184
185        let trim = CommandRecord::Trim(42);
186        assert_eq!(
187            trim.metered_size(),
188            8 + 2 + CommandOp::Trim.to_id().len() + size_of_val(&42u64)
189        );
190    }
191
192    proptest! {
193        #[test]
194        fn trim_roundtrip(trim_point in any::<SeqNum>()) {
195            let cmd = CommandRecord::Trim(trim_point);
196            assert_eq!(CommandRecord::try_from_parts(CommandOp::Trim, trim_point.to_be_bytes().as_slice()), Ok(cmd.clone()));
197            assert_eq!(cmd.metered_size(), 8 + 2 + CommandOp::Trim.to_id().len() + size_of::<SeqNum>());
198        }
199    }
200}