Skip to main content

s2_common/record/
command.rs

1use std::{fmt, str::Utf8Error};
2
3use bytes::Bytes;
4use compact_str::CompactString;
5
6use super::{FencingTokenTooLongError, MeteredSize, fencing::FencingToken};
7use crate::{deep_size::DeepSize, record::SeqNum};
8
9pub const COMMAND_ID_FENCE: &[u8] = b"fence";
10pub const COMMAND_ID_TRIM: &[u8] = b"trim";
11
12#[derive(Debug, PartialEq, Eq, Clone, Copy)]
13pub enum CommandOp {
14    Fence,
15    Trim,
16}
17
18impl CommandOp {
19    pub fn to_id(self) -> &'static [u8] {
20        match self {
21            Self::Fence => COMMAND_ID_FENCE,
22            Self::Trim => COMMAND_ID_TRIM,
23        }
24    }
25
26    pub fn from_id(name: &[u8]) -> Option<Self> {
27        match name {
28            COMMAND_ID_FENCE => Some(Self::Fence),
29            COMMAND_ID_TRIM => Some(Self::Trim),
30            _ => None,
31        }
32    }
33}
34
35impl fmt::Display for CommandOp {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        let name = std::str::from_utf8(self.to_id()).map_err(|_| fmt::Error)?;
38        f.write_str(name)
39    }
40}
41
42#[derive(Debug, PartialEq, Eq, Clone)]
43pub enum CommandRecord {
44    Fence(FencingToken),
45    Trim(SeqNum),
46}
47
48impl DeepSize for CommandRecord {
49    fn deep_size(&self) -> usize {
50        match self {
51            Self::Fence(token) => token.deep_size(),
52            Self::Trim(seq_num) => seq_num.deep_size(),
53        }
54    }
55}
56
57impl MeteredSize for CommandRecord {
58    fn metered_size(&self) -> usize {
59        8 + 2
60            + self.op().to_id().len()
61            + match self {
62                Self::Fence(token) => token.len(),
63                Self::Trim(trim_point) => size_of_val(trim_point),
64            }
65    }
66}
67
68impl CommandRecord {
69    pub fn op(&self) -> CommandOp {
70        match self {
71            CommandRecord::Fence(_) => CommandOp::Fence,
72            CommandRecord::Trim(_) => CommandOp::Trim,
73        }
74    }
75
76    pub fn payload(&self) -> Bytes {
77        match self {
78            Self::Fence(token) => Bytes::copy_from_slice(token.as_bytes()),
79            Self::Trim(trim_point) => Bytes::copy_from_slice(&trim_point.to_be_bytes()),
80        }
81    }
82
83    pub fn try_from_parts(op: CommandOp, payload: &[u8]) -> Result<Self, CommandPayloadError> {
84        match op {
85            CommandOp::Fence => {
86                let token = CompactString::from_utf8(payload)
87                    .map_err(CommandPayloadError::InvalidUtf8)?
88                    .try_into()?;
89                Ok(Self::Fence(token))
90            }
91            CommandOp::Trim => {
92                let trim_point = SeqNum::from_be_bytes(
93                    payload
94                        .try_into()
95                        .map_err(|_| CommandPayloadError::TrimPointSize(payload.len()))?,
96                );
97                Ok(Self::Trim(trim_point))
98            }
99        }
100    }
101}
102
103#[derive(Debug, PartialEq, thiserror::Error)]
104pub enum CommandPayloadError {
105    #[error("invalid UTF-8")]
106    InvalidUtf8(Utf8Error),
107    #[error(transparent)]
108    FencingTokenTooLong(#[from] FencingTokenTooLongError),
109    #[error("earliest sequence number to trim to was {0} bytes, must be 8")]
110    TrimPointSize(usize),
111}
112
113#[cfg(test)]
114mod tests {
115    use compact_str::ToCompactString;
116    use proptest::prelude::*;
117    use rstest::rstest;
118
119    use super::*;
120
121    #[test]
122    fn command_op_names() {
123        for cmd in [CommandOp::Fence, CommandOp::Trim] {
124            let name = cmd.to_id();
125            assert_eq!(CommandOp::from_id(name), Some(cmd));
126        }
127        assert_eq!(CommandOp::from_id(b""), None);
128        assert_eq!(CommandOp::from_id(b"invalid"), None);
129    }
130
131    #[test]
132    fn fencing_token_invalid_utf8() {
133        assert!(matches!(
134            CommandRecord::try_from_parts(CommandOp::Fence, &[0xff]),
135            Err(CommandPayloadError::InvalidUtf8(_))
136        ));
137    }
138
139    #[test]
140    fn fencing_token_too_long() {
141        assert_eq!(
142            CommandRecord::try_from_parts(
143                CommandOp::Fence,
144                b"0123456789012345678901234567890123456789"
145            ),
146            Err(CommandPayloadError::FencingTokenTooLong(
147                FencingTokenTooLongError(40)
148            ))
149        );
150    }
151
152    #[rstest]
153    #[case::empty("")]
154    #[case::arbit("arbitrary")]
155    #[case::full("0123456789012345")]
156    fn fence_roundtrip(#[case] token: &str) {
157        let cmd = CommandRecord::Fence(FencingToken::try_from(token.to_compact_string()).unwrap());
158        assert_eq!(
159            CommandRecord::try_from_parts(CommandOp::Fence, token.as_bytes()),
160            Ok(cmd.clone())
161        );
162        assert_eq!(
163            cmd.metered_size(),
164            8 + 2 + CommandOp::Fence.to_id().len() + token.len()
165        );
166    }
167
168    #[rstest]
169    #[case::empty(b"")]
170    #[case::too_small(b"0123")]
171    #[case::too_big(b"0123456789")]
172    fn trim_point_size(#[case] payload: &[u8]) {
173        assert_eq!(
174            CommandRecord::try_from_parts(CommandOp::Trim, payload),
175            Err(CommandPayloadError::TrimPointSize(payload.len()))
176        );
177    }
178
179    #[test]
180    fn metered_size_is_computed_without_materializing_payload() {
181        let fence =
182            CommandRecord::Fence(FencingToken::try_from("fence-me".to_compact_string()).unwrap());
183        assert_eq!(
184            fence.metered_size(),
185            8 + 2 + CommandOp::Fence.to_id().len() + "fence-me".len()
186        );
187
188        let trim = CommandRecord::Trim(42);
189        assert_eq!(
190            trim.metered_size(),
191            8 + 2 + CommandOp::Trim.to_id().len() + size_of_val(&42u64)
192        );
193    }
194
195    proptest! {
196        #[test]
197        fn trim_roundtrip(trim_point in any::<SeqNum>()) {
198            let cmd = CommandRecord::Trim(trim_point);
199            assert_eq!(CommandRecord::try_from_parts(CommandOp::Trim, trim_point.to_be_bytes().as_slice()), Ok(cmd.clone()));
200            assert_eq!(cmd.metered_size(), 8 + 2 + CommandOp::Trim.to_id().len() + size_of::<SeqNum>());
201        }
202    }
203}