distant_net/common/transport/framed/
frame.rs1use std::borrow::Cow;
2
3use bytes::{Buf, BufMut, BytesMut};
4
5pub type OwnedFrame = Frame<'static>;
7
8#[derive(Clone, Debug, PartialEq, Eq)]
11pub struct Frame<'a> {
12 item: Cow<'a, [u8]>,
14}
15
16impl<'a> Frame<'a> {
17 pub fn new(item: &'a [u8]) -> Self {
19 Self {
20 item: Cow::Borrowed(item),
21 }
22 }
23
24 pub fn into_item(self) -> Cow<'a, [u8]> {
26 self.item
27 }
28}
29
30impl Frame<'_> {
31 pub const HEADER_SIZE: usize = 8;
33
34 pub fn empty() -> Self {
36 Self::new(&[])
37 }
38
39 pub fn len(&self) -> usize {
41 self.item.len()
42 }
43
44 pub fn is_empty(&self) -> bool {
46 self.item.is_empty()
47 }
48
49 #[inline]
51 pub fn is_nonempty(&self) -> bool {
52 !self.is_empty()
53 }
54
55 pub fn as_item(&self) -> &[u8] {
57 &self.item
58 }
59
60 pub fn to_bytes(&self) -> Vec<u8> {
62 let mut bytes = BytesMut::new();
63 self.write(&mut bytes);
64 bytes.to_vec()
65 }
66
67 pub fn write(&self, dst: &mut BytesMut) {
70 dst.reserve(Self::HEADER_SIZE + self.item.len());
71
72 dst.put_u64((self.item.len()) as u64);
74 dst.put_slice(&self.item);
75 }
76
77 pub fn read(src: &mut BytesMut) -> Option<OwnedFrame> {
80 if src.len() <= Self::HEADER_SIZE {
82 return None;
83 }
84
85 let item_len = u64::from_be_bytes(src[..Self::HEADER_SIZE].try_into().unwrap()) as usize;
87
88 if src.len() < item_len + Self::HEADER_SIZE {
90 return None;
91 }
92
93 let item = src[Self::HEADER_SIZE..(Self::HEADER_SIZE + item_len)].to_vec();
95
96 src.advance(Self::HEADER_SIZE + item_len);
98
99 Some(Frame::from(item))
100 }
101
102 pub fn available(src: &BytesMut) -> bool {
105 matches!(Frame::read(&mut src.clone()), Some(_))
106 }
107
108 pub fn as_borrowed(&self) -> Frame<'_> {
110 let item = match &self.item {
111 Cow::Borrowed(x) => x,
112 Cow::Owned(x) => x.as_slice(),
113 };
114
115 Frame {
116 item: Cow::Borrowed(item),
117 }
118 }
119
120 pub fn into_owned(self) -> OwnedFrame {
129 Frame {
130 item: Cow::from(self.item.into_owned()),
131 }
132 }
133}
134
135impl<'a> From<&'a [u8]> for Frame<'a> {
136 fn from(item: &'a [u8]) -> Self {
138 Self {
139 item: Cow::Borrowed(item),
140 }
141 }
142}
143
144impl<'a, const N: usize> From<&'a [u8; N]> for Frame<'a> {
145 fn from(item: &'a [u8; N]) -> Self {
147 Self {
148 item: Cow::Borrowed(item),
149 }
150 }
151}
152
153impl<const N: usize> From<[u8; N]> for OwnedFrame {
154 fn from(item: [u8; N]) -> Self {
157 Self {
158 item: Cow::Owned(item.to_vec()),
159 }
160 }
161}
162
163impl From<Vec<u8>> for OwnedFrame {
164 fn from(item: Vec<u8>) -> Self {
166 Self {
167 item: Cow::Owned(item),
168 }
169 }
170}
171
172impl AsRef<[u8]> for Frame<'_> {
173 fn as_ref(&self) -> &[u8] {
175 AsRef::as_ref(&self.item)
176 }
177}
178
179impl Extend<u8> for Frame<'_> {
180 fn extend<T: IntoIterator<Item = u8>>(&mut self, iter: T) {
183 match &mut self.item {
184 Cow::Borrowed(item) => {
187 let mut item = item.to_vec();
188 item.extend(iter);
189 self.item = Cow::Owned(item);
190 }
191
192 Cow::Owned(item) => {
194 item.extend(iter);
195 }
196 }
197 }
198}
199
200impl PartialEq<[u8]> for Frame<'_> {
201 fn eq(&self, item: &[u8]) -> bool {
203 self.item.as_ref().eq(item)
204 }
205}
206
207impl<'a> PartialEq<&'a [u8]> for Frame<'_> {
208 fn eq(&self, item: &&'a [u8]) -> bool {
210 self.item.as_ref().eq(*item)
211 }
212}
213
214impl<const N: usize> PartialEq<[u8; N]> for Frame<'_> {
215 fn eq(&self, item: &[u8; N]) -> bool {
217 self.item.as_ref().eq(item)
218 }
219}
220
221impl<'a, const N: usize> PartialEq<&'a [u8; N]> for Frame<'_> {
222 fn eq(&self, item: &&'a [u8; N]) -> bool {
224 self.item.as_ref().eq(*item)
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use test_log::test;
231
232 use super::*;
233
234 #[test]
235 fn write_should_succeed_when_item_is_zero_bytes() {
236 let frame = Frame::new(&[]);
237
238 let mut buf = BytesMut::new();
239 frame.write(&mut buf);
240
241 assert_eq!(buf.as_ref(), &[0, 0, 0, 0, 0, 0, 0, 0]);
244 }
245
246 #[test]
247 fn write_should_build_a_frame_containing_a_length_and_item() {
248 let frame = Frame::new(b"hello, world");
249
250 let mut buf = BytesMut::new();
251 frame.write(&mut buf);
252
253 let len = buf.get_u64() as usize;
254 assert_eq!(len, 12, "Wrong length writed");
255 assert_eq!(buf.as_ref(), b"hello, world");
256 }
257
258 #[test]
259 fn read_should_return_none_if_data_smaller_than_or_equal_to_item_length_field() {
260 let mut buf = BytesMut::new();
261 buf.put_bytes(0, Frame::HEADER_SIZE);
262
263 let result = Frame::read(&mut buf);
264 assert!(matches!(result, None), "Unexpected result: {:?}", result);
265 }
266
267 #[test]
268 fn read_should_return_none_if_not_enough_data_for_frame() {
269 let mut buf = BytesMut::new();
270 buf.put_u64(0);
271
272 let result = Frame::read(&mut buf);
273 assert!(matches!(result, None), "Unexpected result: {:?}", result);
274 }
275
276 #[test]
277 fn read_should_succeed_if_written_item_length_is_zero() {
278 let mut buf = BytesMut::new();
279 buf.put_u64(0);
280 buf.put_u8(255);
281
282 let frame = Frame::read(&mut buf).expect("missing frame");
284 assert_eq!(frame, Frame::empty());
285
286 assert_eq!(buf.as_ref(), &[255]);
288 }
289
290 #[test]
291 fn read_should_advance_src_by_frame_size_even_if_item_length_is_zero() {
292 let mut buf = BytesMut::new();
293 buf.put_u64(0);
294 buf.put_bytes(0, 3);
295
296 assert_eq!(Frame::read(&mut buf).unwrap(), Frame::empty());
297 assert_eq!(buf.len(), 3, "Advanced an unexpected amount in src buf");
298 }
299
300 #[test]
301 fn read_should_advance_src_by_frame_size_when_successful() {
302 let mut buf = BytesMut::new();
304 Frame::new(b"hello, world").write(&mut buf);
305 buf.put_bytes(0, 3);
306
307 assert!(
308 Frame::read(&mut buf).is_some(),
309 "read unexpectedly missing frame"
310 );
311 assert_eq!(buf.len(), 3, "Advanced an unexpected amount in src buf");
312 }
313
314 #[test]
315 fn read_should_return_some_byte_vec_when_successful() {
316 let mut buf = BytesMut::new();
317 Frame::new(b"hello, world").write(&mut buf);
318
319 let item = Frame::read(&mut buf).expect("missing frame");
320 assert_eq!(item, b"hello, world");
321 }
322}