binate/frame/codec/
payload.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// The payload frame.
5///
6/// # Frame Contents
7///
8/// ```text
9///  0                   1                   2                   3
10///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
11/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
12/// |                           Stream ID                           |
13/// +-----------+-+-+-+-+-+---------+-------------------------------+
14/// |Frame Type |0|M|F|C|N|  Flags  |
15/// +-------------------------------+-------------------------------+
16///                      Metadata & Data
17/// ```
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct PayloadFrame {
20    stream_id: u32,
21    flags: Flags,
22    payload: Payload,
23}
24
25impl PayloadFrame {
26    /// Type of this frame.
27    pub const TYPE: FrameType = FrameType::PAYLOAD;
28
29    /// Create a new `Payload` frame.
30    ///
31    /// - `stream_id` MUST be <= [`MAX_U31`].
32    /// - flag `follows` means more fragments follow this fragment.
33    /// - flag `complete` indicates stream completion. If set, `on_complete()` will be invoked on
34    /// Subscriber/Observer.
35    /// - flag `next` indicates Next (Payload Data and/or Metadata present). If set,
36    /// `on_next(Payload)` will be invoked on Subscriber/Observer.
37    ///
38    /// A PAYLOAD MUST NOT have both (C)complete and (N)ext empty (false). See [`Payload Frame`]
39    /// section in the spec for more details.
40    ///
41    /// [`Payload Frame`]: https://rsocket.io/about/protocol/#payload-frame-0x0a
42    pub fn new(stream_id: u32, mut flags: Flags, payload: Payload) -> Self {
43        debug_assert_max_u31!(stream_id);
44        let stream_id = stream_id & MAX_U31;
45        flags &= Flags::FOLLOWS | Flags::COMPLETE | Flags::NEXT;
46        if payload.has_metadata() {
47            flags |= Flags::METADATA
48        }
49        PayloadFrame { stream_id, flags, payload }
50    }
51
52    /// Returns the stream ID of this frame.
53    pub fn stream_id(&self) -> u32 {
54        self.stream_id
55    }
56
57    /// Returns true if this frame has the FOLLOWS flag set.
58    pub fn is_follows(&self) -> bool {
59        self.flags.contains(Flags::FOLLOWS)
60    }
61
62    /// Returns true if this frame has the COMPLETE flag set.
63    pub fn is_complete(&self) -> bool {
64        self.flags.contains(Flags::COMPLETE)
65    }
66
67    /// Returns true if this frame has the NEXT flag set.
68    pub fn is_next(&self) -> bool {
69        self.flags.contains(Flags::NEXT)
70    }
71
72    /// Returns the metadata attached to this frame, if any.
73    pub fn metadata(&self) -> Option<&Bytes> {
74        self.payload.metadata()
75    }
76
77    /// Returns the data attached to this frame, if any.
78    pub fn data(&self) -> Option<&Bytes> {
79        self.payload.data()
80    }
81
82    /// Returns the payload attached to this frame.
83    pub fn payload(self) -> Payload {
84        self.payload
85    }
86}
87
88impl Encode for PayloadFrame {
89    fn encode(&self, buf: &mut BytesMut) {
90        buf.put_u32(self.stream_id);
91        buf.put_u16(FrameType::PAYLOAD.bits() | self.flags.bits());
92        let u24 = U24::from_usize(
93            self.payload.metadata().map(|v| v.len()).unwrap_or_default(),
94        );
95        buf.put_u8(u24.0);
96        buf.put_u16(u24.1);
97        self.payload.encode(buf);
98    }
99
100    fn len(&self) -> usize {
101        // len(stream_id): 4
102        // len(flags): 2
103        // len(metadata_len): 3
104        // len(payload)
105        9 + self.payload.len()
106    }
107}
108
109impl Decode for PayloadFrame {
110    type Value = Self;
111
112    fn decode<B: Buf>(
113        buf: &mut B,
114        stream_id: u32,
115        flags: Flags,
116    ) -> Result<Self::Value> {
117        let payload = eat_payload(buf, true)?;
118        Ok(PayloadFrame { stream_id, flags, payload })
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125
126    #[test]
127    fn test_codec() {
128        let lease = PayloadFrame::new(
129            1,
130            Flags::FOLLOWS | Flags::NEXT,
131            Payload::builder()
132                .set_metadata(Bytes::from("metadata"))
133                .set_data(Bytes::from("data"))
134                .build(),
135        );
136
137        let mut buf = BytesMut::new();
138        lease.encode(&mut buf);
139        let mut buf = buf.freeze();
140
141        // len(stream_id): 4
142        // len(flags): 2
143        // len(metadata_len): 3
144        // len(metadata): 8
145        // len(data): 4
146        let buf_len = buf.len();
147        assert_eq!(buf_len, 4 + 2 + 3 + 8 + 4);
148
149        // Eat the stream_id and flags before decoding bytes.
150        let stream_id = eat_stream_id(&mut buf).unwrap();
151        let (frame_type, flags) = eat_flags(&mut buf).unwrap();
152        assert_eq!(frame_type, FrameType::PAYLOAD);
153        assert_eq!(flags, Flags::METADATA | Flags::FOLLOWS | Flags::NEXT);
154
155        let decoded =
156            PayloadFrame::decode(&mut buf, stream_id, flags).unwrap();
157
158        assert_eq!(decoded, lease);
159        assert_eq!(lease.len(), buf_len);
160        assert_eq!(decoded.len(), buf_len);
161    }
162}