distant_net/common/transport/framed/
frame.rs

1use std::borrow::Cow;
2
3use bytes::{Buf, BufMut, BytesMut};
4
5/// Represents a frame whose lifetime is static
6pub type OwnedFrame = Frame<'static>;
7
8/// Represents some data wrapped in a frame in order to ship it over the network. The format is
9/// simple and follows `{len}{item}` where `len` is the length of the item as a `u64`.
10#[derive(Clone, Debug, PartialEq, Eq)]
11pub struct Frame<'a> {
12    /// Represents the item that will be shipped across the network
13    item: Cow<'a, [u8]>,
14}
15
16impl<'a> Frame<'a> {
17    /// Creates a new frame wrapping the `item` that will be shipped across the network.
18    pub fn new(item: &'a [u8]) -> Self {
19        Self {
20            item: Cow::Borrowed(item),
21        }
22    }
23
24    /// Consumes the frame and returns its underlying item.
25    pub fn into_item(self) -> Cow<'a, [u8]> {
26        self.item
27    }
28}
29
30impl Frame<'_> {
31    /// Total bytes to use as the header field denoting a frame's size.
32    pub const HEADER_SIZE: usize = 8;
33
34    /// Creates a new frame with no item.
35    pub fn empty() -> Self {
36        Self::new(&[])
37    }
38
39    /// Returns the len (in bytes) of the item wrapped by the frame.
40    pub fn len(&self) -> usize {
41        self.item.len()
42    }
43
44    /// Returns true if the frame is comprised of zero bytes.
45    pub fn is_empty(&self) -> bool {
46        self.item.is_empty()
47    }
48
49    /// Returns true if the frame is comprised of some bytes.
50    #[inline]
51    pub fn is_nonempty(&self) -> bool {
52        !self.is_empty()
53    }
54
55    /// Returns a reference to the bytes of the frame's item.
56    pub fn as_item(&self) -> &[u8] {
57        &self.item
58    }
59
60    /// Writes the frame to a new [`Vec`] of bytes, returning them on success.
61    pub fn to_bytes(&self) -> Vec<u8> {
62        let mut bytes = BytesMut::new();
63        self.write(&mut bytes);
64        bytes.to_vec()
65    }
66
67    /// Writes the frame to the end of `dst`, including the header representing the length of the
68    /// item as part of the written bytes.
69    pub fn write(&self, dst: &mut BytesMut) {
70        dst.reserve(Self::HEADER_SIZE + self.item.len());
71
72        // Add data in form of {LEN}{ITEM}
73        dst.put_u64((self.item.len()) as u64);
74        dst.put_slice(&self.item);
75    }
76
77    /// Attempts to read a frame from `src`, returning `Some(Frame)` if a frame was found
78    /// (including the header) or `None` if the current `src` does not contain a frame.
79    pub fn read(src: &mut BytesMut) -> Option<OwnedFrame> {
80        // First, check if we have more data than just our frame's message length
81        if src.len() <= Self::HEADER_SIZE {
82            return None;
83        }
84
85        // Second, retrieve total size of our frame's message
86        let item_len = u64::from_be_bytes(src[..Self::HEADER_SIZE].try_into().unwrap()) as usize;
87
88        // Third, check if we have all data for our frame; if not, exit early
89        if src.len() < item_len + Self::HEADER_SIZE {
90            return None;
91        }
92
93        // Fourth, get and return our item
94        let item = src[Self::HEADER_SIZE..(Self::HEADER_SIZE + item_len)].to_vec();
95
96        // Fifth, advance so frame is no longer kept around
97        src.advance(Self::HEADER_SIZE + item_len);
98
99        Some(Frame::from(item))
100    }
101
102    /// Checks if a full frame is available from `src`, returning true if a frame was found false
103    /// if the current `src` does not contain a frame. Does not consume the frame.
104    pub fn available(src: &BytesMut) -> bool {
105        matches!(Frame::read(&mut src.clone()), Some(_))
106    }
107
108    /// Returns a new frame which is identical but has a lifetime tied to this frame.
109    pub fn as_borrowed(&self) -> Frame<'_> {
110        let item = match &self.item {
111            Cow::Borrowed(x) => x,
112            Cow::Owned(x) => x.as_slice(),
113        };
114
115        Frame {
116            item: Cow::Borrowed(item),
117        }
118    }
119
120    /// Converts the [`Frame`] into an owned copy.
121    ///
122    /// If you construct the frame from an item with a non-static lifetime, you may run into
123    /// lifetime problems due to the way the struct is designed. Calling this function will ensure
124    /// that the returned value has a static lifetime.
125    ///
126    /// This is different from just cloning. Cloning the frame will just copy the references, and
127    /// thus the lifetime will remain the same.
128    pub fn into_owned(self) -> OwnedFrame {
129        Frame {
130            item: Cow::from(self.item.into_owned()),
131        }
132    }
133}
134
135impl<'a> From<&'a [u8]> for Frame<'a> {
136    /// Consumes the byte slice and returns a [`Frame`] whose item references those bytes.
137    fn from(item: &'a [u8]) -> Self {
138        Self {
139            item: Cow::Borrowed(item),
140        }
141    }
142}
143
144impl<'a, const N: usize> From<&'a [u8; N]> for Frame<'a> {
145    /// Consumes the byte array slice and returns a [`Frame`] whose item references those bytes.
146    fn from(item: &'a [u8; N]) -> Self {
147        Self {
148            item: Cow::Borrowed(item),
149        }
150    }
151}
152
153impl<const N: usize> From<[u8; N]> for OwnedFrame {
154    /// Consumes an array of bytes and returns a [`Frame`] with an owned item of those bytes
155    /// allocated as a [`Vec`].
156    fn from(item: [u8; N]) -> Self {
157        Self {
158            item: Cow::Owned(item.to_vec()),
159        }
160    }
161}
162
163impl From<Vec<u8>> for OwnedFrame {
164    /// Consumes a [`Vec`] of bytes and returns a [`Frame`] with an owned item of those bytes.
165    fn from(item: Vec<u8>) -> Self {
166        Self {
167            item: Cow::Owned(item),
168        }
169    }
170}
171
172impl AsRef<[u8]> for Frame<'_> {
173    /// Returns a reference to this [`Frame`]'s item as bytes.
174    fn as_ref(&self) -> &[u8] {
175        AsRef::as_ref(&self.item)
176    }
177}
178
179impl Extend<u8> for Frame<'_> {
180    /// Extends the [`Frame`]'s item with the provided bytes, allocating an owned [`Vec`]
181    /// underneath if this frame had borrowed bytes as an item.
182    fn extend<T: IntoIterator<Item = u8>>(&mut self, iter: T) {
183        match &mut self.item {
184            // If we only have a borrowed item, we need to allocate it into a new vec so we can
185            // extend it with additional bytes
186            Cow::Borrowed(item) => {
187                let mut item = item.to_vec();
188                item.extend(iter);
189                self.item = Cow::Owned(item);
190            }
191
192            // Othewise, if we already have an owned allocation of bytes, we just extend it
193            Cow::Owned(item) => {
194                item.extend(iter);
195            }
196        }
197    }
198}
199
200impl PartialEq<[u8]> for Frame<'_> {
201    /// Test if [`Frame`]'s item matches the provided bytes.
202    fn eq(&self, item: &[u8]) -> bool {
203        self.item.as_ref().eq(item)
204    }
205}
206
207impl<'a> PartialEq<&'a [u8]> for Frame<'_> {
208    /// Test if [`Frame`]'s item matches the provided bytes.
209    fn eq(&self, item: &&'a [u8]) -> bool {
210        self.item.as_ref().eq(*item)
211    }
212}
213
214impl<const N: usize> PartialEq<[u8; N]> for Frame<'_> {
215    /// Test if [`Frame`]'s item matches the provided bytes.
216    fn eq(&self, item: &[u8; N]) -> bool {
217        self.item.as_ref().eq(item)
218    }
219}
220
221impl<'a, const N: usize> PartialEq<&'a [u8; N]> for Frame<'_> {
222    /// Test if [`Frame`]'s item matches the provided bytes.
223    fn eq(&self, item: &&'a [u8; N]) -> bool {
224        self.item.as_ref().eq(*item)
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use test_log::test;
231
232    use super::*;
233
234    #[test]
235    fn write_should_succeed_when_item_is_zero_bytes() {
236        let frame = Frame::new(&[]);
237
238        let mut buf = BytesMut::new();
239        frame.write(&mut buf);
240
241        // Writing a frame of zero bytes means that the header is all zeros and there is
242        // no item that follows the header
243        assert_eq!(buf.as_ref(), &[0, 0, 0, 0, 0, 0, 0, 0]);
244    }
245
246    #[test]
247    fn write_should_build_a_frame_containing_a_length_and_item() {
248        let frame = Frame::new(b"hello, world");
249
250        let mut buf = BytesMut::new();
251        frame.write(&mut buf);
252
253        let len = buf.get_u64() as usize;
254        assert_eq!(len, 12, "Wrong length writed");
255        assert_eq!(buf.as_ref(), b"hello, world");
256    }
257
258    #[test]
259    fn read_should_return_none_if_data_smaller_than_or_equal_to_item_length_field() {
260        let mut buf = BytesMut::new();
261        buf.put_bytes(0, Frame::HEADER_SIZE);
262
263        let result = Frame::read(&mut buf);
264        assert!(matches!(result, None), "Unexpected result: {:?}", result);
265    }
266
267    #[test]
268    fn read_should_return_none_if_not_enough_data_for_frame() {
269        let mut buf = BytesMut::new();
270        buf.put_u64(0);
271
272        let result = Frame::read(&mut buf);
273        assert!(matches!(result, None), "Unexpected result: {:?}", result);
274    }
275
276    #[test]
277    fn read_should_succeed_if_written_item_length_is_zero() {
278        let mut buf = BytesMut::new();
279        buf.put_u64(0);
280        buf.put_u8(255);
281
282        // Reading will result in a frame of zero bytes
283        let frame = Frame::read(&mut buf).expect("missing frame");
284        assert_eq!(frame, Frame::empty());
285
286        // Nothing following the frame header should have been extracted
287        assert_eq!(buf.as_ref(), &[255]);
288    }
289
290    #[test]
291    fn read_should_advance_src_by_frame_size_even_if_item_length_is_zero() {
292        let mut buf = BytesMut::new();
293        buf.put_u64(0);
294        buf.put_bytes(0, 3);
295
296        assert_eq!(Frame::read(&mut buf).unwrap(), Frame::empty());
297        assert_eq!(buf.len(), 3, "Advanced an unexpected amount in src buf");
298    }
299
300    #[test]
301    fn read_should_advance_src_by_frame_size_when_successful() {
302        // Add 3 extra bytes after a full frame
303        let mut buf = BytesMut::new();
304        Frame::new(b"hello, world").write(&mut buf);
305        buf.put_bytes(0, 3);
306
307        assert!(
308            Frame::read(&mut buf).is_some(),
309            "read unexpectedly missing frame"
310        );
311        assert_eq!(buf.len(), 3, "Advanced an unexpected amount in src buf");
312    }
313
314    #[test]
315    fn read_should_return_some_byte_vec_when_successful() {
316        let mut buf = BytesMut::new();
317        Frame::new(b"hello, world").write(&mut buf);
318
319        let item = Frame::read(&mut buf).expect("missing frame");
320        assert_eq!(item, b"hello, world");
321    }
322}