binate/frame/codec/
keepalive.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// The keepalive frame.
5///
6/// # Frame Contents
7///
8/// ```text
9/// 0                   1                   2                   3
10/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
11/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
12/// |                         Stream ID = 0                         |
13/// +-----------+-+-+-+-------------+-------------------------------+
14/// |Frame Type |0|0|R|    Flags    |
15/// +-----------+-+-+-+-------------+-------------------------------+
16/// |0|                  Last Received Position                     |
17/// +                                                               +
18/// |                                                               |
19/// +---------------------------------------------------------------+
20///                              Data
21/// ```
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct KeepaliveFrame {
24    respond: bool,
25    last_received_position: u64,
26    data: Option<Bytes>,
27}
28
29impl KeepaliveFrame {
30    /// KEEPALIVE frames MUST always use Stream ID 0 as they pertain to the Connection.
31    pub const STREAM_ID: u32 = 0;
32
33    /// Type of this frame.
34    pub const TYPE: FrameType = FrameType::KEEPALIVE;
35
36    /// Create a new `Keepalive` frame.
37    ///
38    /// - `last_received_position` MUST be <= [`MAX_U63`].
39    pub fn new(
40        last_received_position: u64,
41        data: Option<Bytes>,
42        respond: bool,
43    ) -> Self {
44        debug_assert_max_u63!(last_received_position);
45        KeepaliveFrame {
46            respond,
47            last_received_position: last_received_position & MAX_U63,
48            data,
49        }
50    }
51
52    /// Returns the last received position of this frame.
53    pub fn last_received_position(&self) -> u64 {
54        self.last_received_position
55    }
56
57    /// Returns the data attached to this frame, if any.
58    pub fn data(&self) -> Option<&Bytes> {
59        self.data.as_ref()
60    }
61
62    /// Returns true if this frame has the Respond flag set.
63    pub fn is_respond(&self) -> bool {
64        self.respond
65    }
66}
67
68impl Encode for KeepaliveFrame {
69    fn encode(&self, buf: &mut BytesMut) {
70        buf.put_u32(0);
71        if self.respond {
72            buf.put_u16(FrameType::KEEPALIVE.bits() | Flags::RESPOND.bits());
73        } else {
74            buf.put_u16(FrameType::KEEPALIVE.bits());
75        }
76        buf.put_u64(self.last_received_position);
77        if let Some(data) = &self.data {
78            buf.put_slice(data);
79        }
80    }
81
82    fn len(&self) -> usize {
83        // len(stream_id): 4
84        // len(flags): 2
85        // len(last_received_position): 8
86        let mut len = 14;
87
88        // len(data)
89        if let Some(data) = &self.data {
90            len += data.len();
91        }
92        len
93    }
94}
95
96impl Decode for KeepaliveFrame {
97    type Value = Self;
98
99    fn decode<B: Buf>(
100        buf: &mut B,
101        stream_id: u32,
102        flags: Flags,
103    ) -> Result<Self::Value> {
104        if stream_id != 0 {
105            return Err(DecodeError::InvalidStreamId {
106                expected: "0",
107                found: stream_id,
108            });
109        }
110        let respond = flags.contains(Flags::RESPOND);
111        let last_received_position = eat_u63(buf)?;
112        let data = match buf.remaining() {
113            0 => None,
114            len => Some(eat_bytes(buf, len)?),
115        };
116        Ok(KeepaliveFrame { respond, last_received_position, data })
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn test_stream_id() {
126        assert_eq!(KeepaliveFrame::STREAM_ID, 0);
127    }
128
129    #[test]
130    fn test_codec() {
131        let lease = KeepaliveFrame::new(1, Some(Bytes::from("data")), true);
132
133        let mut buf = BytesMut::new();
134        lease.encode(&mut buf);
135        let mut buf = buf.freeze();
136
137        // len(stream_id): 4
138        // len(flags): 2
139        // len(last_received_position): 8
140        // len(data): 4
141        let buf_len = buf.len();
142        assert_eq!(buf_len, 4 + 2 + 8 + 4);
143
144        // Eat the stream_id and flags before decoding bytes.
145        let stream_id = eat_stream_id(&mut buf).unwrap();
146        let (frame_type, flags) = eat_flags(&mut buf).unwrap();
147        assert_eq!(stream_id, 0);
148        assert_eq!(frame_type, FrameType::KEEPALIVE);
149        assert_eq!(flags, Flags::RESPOND);
150
151        let decoded =
152            KeepaliveFrame::decode(&mut buf, stream_id, flags).unwrap();
153
154        assert_eq!(decoded, lease);
155        assert_eq!(lease.len(), buf_len);
156        assert_eq!(decoded.len(), buf_len);
157    }
158}