dynamo_runtime/transports/event_plane/
frame.rs1use bytes::{Buf, BufMut, Bytes, BytesMut};
11use thiserror::Error;
12
13pub const FRAME_VERSION: u8 = 1;
15
16pub const FRAME_HEADER_SIZE: usize = 5;
18
19#[derive(Debug, Error)]
21pub enum FrameError {
22 #[error("Incomplete frame header: expected {FRAME_HEADER_SIZE} bytes, got {0} bytes")]
23 IncompleteHeader(usize),
24
25 #[error("Incomplete frame payload: expected {expected} bytes, got {available} bytes")]
26 IncompletePayload { expected: usize, available: usize },
27
28 #[error("Unsupported protocol version: {0} (expected {FRAME_VERSION})")]
29 UnsupportedVersion(u8),
30
31 #[error("Frame too large: {0} bytes exceeds maximum")]
32 FrameTooLarge(usize),
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct FrameHeader {
38 pub version: u8,
40 pub payload_len: u32,
42}
43
44impl FrameHeader {
45 pub fn encode(&self, buf: &mut BytesMut) {
47 buf.put_u8(self.version);
48 buf.put_u32(self.payload_len);
49 }
50
51 pub fn decode(buf: &mut impl Buf) -> Result<Self, FrameError> {
53 if buf.remaining() < FRAME_HEADER_SIZE {
54 return Err(FrameError::IncompleteHeader(buf.remaining()));
55 }
56
57 let version = buf.get_u8();
58 if version != FRAME_VERSION {
59 return Err(FrameError::UnsupportedVersion(version));
60 }
61
62 let payload_len = buf.get_u32();
63
64 Ok(FrameHeader {
65 version,
66 payload_len,
67 })
68 }
69
70 pub fn frame_size(&self) -> usize {
72 FRAME_HEADER_SIZE + self.payload_len as usize
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct Frame {
79 pub header: FrameHeader,
80 pub payload: Bytes,
81}
82
83impl Frame {
84 pub fn new(payload: Bytes) -> Self {
85 Self {
86 header: FrameHeader {
87 version: FRAME_VERSION,
88 payload_len: payload.len() as u32,
89 },
90 payload,
91 }
92 }
93
94 pub fn encode(&self) -> Bytes {
96 let mut buf = BytesMut::with_capacity(self.header.frame_size());
97 self.header.encode(&mut buf);
98 buf.put(self.payload.clone());
99 buf.freeze()
100 }
101
102 pub fn decode(mut buf: impl Buf) -> Result<Self, FrameError> {
104 let header = FrameHeader::decode(&mut buf)?;
105
106 let payload_len = header.payload_len as usize;
107 if buf.remaining() < payload_len {
108 return Err(FrameError::IncompletePayload {
109 expected: payload_len,
110 available: buf.remaining(),
111 });
112 }
113
114 let payload = buf.copy_to_bytes(payload_len);
115
116 Ok(Frame { header, payload })
117 }
118
119 pub fn size(&self) -> usize {
120 self.header.frame_size()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn test_frame_header_encode_decode() {
130 let header = FrameHeader {
131 version: FRAME_VERSION,
132 payload_len: 1024,
133 };
134
135 let mut buf = BytesMut::new();
136 header.encode(&mut buf);
137
138 assert_eq!(buf.len(), FRAME_HEADER_SIZE);
139
140 let decoded = FrameHeader::decode(&mut buf).unwrap();
141 assert_eq!(decoded.version, header.version);
142 assert_eq!(decoded.payload_len, header.payload_len);
143 }
144
145 #[test]
146 fn test_frame_encode_decode_roundtrip() {
147 let payload = Bytes::from("hello world");
148 let frame = Frame::new(payload.clone());
149
150 let encoded = frame.encode();
151 let decoded = Frame::decode(encoded).unwrap();
152
153 assert_eq!(decoded.header.version, FRAME_VERSION);
154 assert_eq!(decoded.payload, payload);
155 }
156
157 #[test]
158 fn test_frame_error_incomplete_header() {
159 let buf = Bytes::from(vec![1, 2, 3]); let result = Frame::decode(buf);
161 assert!(matches!(result, Err(FrameError::IncompleteHeader(3))));
162 }
163
164 #[test]
165 fn test_frame_error_incomplete_payload() {
166 let mut buf = BytesMut::new();
167 let header = FrameHeader {
168 version: FRAME_VERSION,
169 payload_len: 1000, };
171 header.encode(&mut buf);
172 buf.put_slice(b"short"); let result = Frame::decode(buf.freeze());
175 assert!(matches!(
176 result,
177 Err(FrameError::IncompletePayload {
178 expected: 1000,
179 available: 5
180 })
181 ));
182 }
183
184 #[test]
185 fn test_frame_error_unsupported_version() {
186 let mut buf = BytesMut::new();
187 buf.put_u8(99); buf.put_u32(0); let result = FrameHeader::decode(&mut buf);
191 assert!(matches!(result, Err(FrameError::UnsupportedVersion(99))));
192 }
193
194 #[test]
195 fn test_zero_length_payload() {
196 let payload = Bytes::new();
197 let frame = Frame::new(payload.clone());
198
199 let encoded = frame.encode();
200 assert_eq!(encoded.len(), FRAME_HEADER_SIZE);
201
202 let decoded = Frame::decode(encoded).unwrap();
203 assert_eq!(decoded.payload.len(), 0);
204 }
205}