binate/frame/codec/
keepalive.rs1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct KeepaliveFrame {
24 respond: bool,
25 last_received_position: u64,
26 data: Option<Bytes>,
27}
28
29impl KeepaliveFrame {
30 pub const STREAM_ID: u32 = 0;
32
33 pub const TYPE: FrameType = FrameType::KEEPALIVE;
35
36 pub fn new(
40 last_received_position: u64,
41 data: Option<Bytes>,
42 respond: bool,
43 ) -> Self {
44 debug_assert_max_u63!(last_received_position);
45 KeepaliveFrame {
46 respond,
47 last_received_position: last_received_position & MAX_U63,
48 data,
49 }
50 }
51
52 pub fn last_received_position(&self) -> u64 {
54 self.last_received_position
55 }
56
57 pub fn data(&self) -> Option<&Bytes> {
59 self.data.as_ref()
60 }
61
62 pub fn is_respond(&self) -> bool {
64 self.respond
65 }
66}
67
68impl Encode for KeepaliveFrame {
69 fn encode(&self, buf: &mut BytesMut) {
70 buf.put_u32(0);
71 if self.respond {
72 buf.put_u16(FrameType::KEEPALIVE.bits() | Flags::RESPOND.bits());
73 } else {
74 buf.put_u16(FrameType::KEEPALIVE.bits());
75 }
76 buf.put_u64(self.last_received_position);
77 if let Some(data) = &self.data {
78 buf.put_slice(data);
79 }
80 }
81
82 fn len(&self) -> usize {
83 let mut len = 14;
87
88 if let Some(data) = &self.data {
90 len += data.len();
91 }
92 len
93 }
94}
95
96impl Decode for KeepaliveFrame {
97 type Value = Self;
98
99 fn decode<B: Buf>(
100 buf: &mut B,
101 stream_id: u32,
102 flags: Flags,
103 ) -> Result<Self::Value> {
104 if stream_id != 0 {
105 return Err(DecodeError::InvalidStreamId {
106 expected: "0",
107 found: stream_id,
108 });
109 }
110 let respond = flags.contains(Flags::RESPOND);
111 let last_received_position = eat_u63(buf)?;
112 let data = match buf.remaining() {
113 0 => None,
114 len => Some(eat_bytes(buf, len)?),
115 };
116 Ok(KeepaliveFrame { respond, last_received_position, data })
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 #[test]
125 fn test_stream_id() {
126 assert_eq!(KeepaliveFrame::STREAM_ID, 0);
127 }
128
129 #[test]
130 fn test_codec() {
131 let lease = KeepaliveFrame::new(1, Some(Bytes::from("data")), true);
132
133 let mut buf = BytesMut::new();
134 lease.encode(&mut buf);
135 let mut buf = buf.freeze();
136
137 let buf_len = buf.len();
142 assert_eq!(buf_len, 4 + 2 + 8 + 4);
143
144 let stream_id = eat_stream_id(&mut buf).unwrap();
146 let (frame_type, flags) = eat_flags(&mut buf).unwrap();
147 assert_eq!(stream_id, 0);
148 assert_eq!(frame_type, FrameType::KEEPALIVE);
149 assert_eq!(flags, Flags::RESPOND);
150
151 let decoded =
152 KeepaliveFrame::decode(&mut buf, stream_id, flags).unwrap();
153
154 assert_eq!(decoded, lease);
155 assert_eq!(lease.len(), buf_len);
156 assert_eq!(decoded.len(), buf_len);
157 }
158}