1use crate::{Error, QoS, Result, MAGIC_BYTE};
22use bytes::{Buf, BufMut, Bytes, BytesMut};
23
24pub const HEADER_SIZE: usize = 4;
26
27pub const HEADER_SIZE_WITH_TS: usize = 12;
29
30pub const MAX_PAYLOAD_SIZE: usize = 65535;
32
33#[derive(Debug, Clone, Copy, Default)]
35pub struct FrameFlags {
36 pub qos: QoS,
37 pub has_timestamp: bool,
38 pub encrypted: bool,
39 pub compressed: bool,
40}
41
42impl FrameFlags {
43 pub fn to_byte(&self) -> u8 {
44 let mut flags = 0u8;
45 flags |= (self.qos as u8) << 6;
46 if self.has_timestamp {
47 flags |= 0x20;
48 }
49 if self.encrypted {
50 flags |= 0x10;
51 }
52 if self.compressed {
53 flags |= 0x08;
54 }
55 flags
56 }
57
58 pub fn from_byte(byte: u8) -> Self {
59 Self {
60 qos: QoS::from_u8((byte >> 6) & 0x03).unwrap_or(QoS::Fire),
61 has_timestamp: (byte & 0x20) != 0,
62 encrypted: (byte & 0x10) != 0,
63 compressed: (byte & 0x08) != 0,
64 }
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct Frame {
71 pub flags: FrameFlags,
72 pub timestamp: Option<u64>,
73 pub payload: Bytes,
74}
75
76impl Frame {
77 pub fn new(payload: impl Into<Bytes>) -> Self {
79 Self {
80 flags: FrameFlags::default(),
81 timestamp: None,
82 payload: payload.into(),
83 }
84 }
85
86 pub fn with_qos(mut self, qos: QoS) -> Self {
88 self.flags.qos = qos;
89 self
90 }
91
92 pub fn with_timestamp(mut self, timestamp: u64) -> Self {
94 self.timestamp = Some(timestamp);
95 self.flags.has_timestamp = true;
96 self
97 }
98
99 pub fn with_encrypted(mut self, encrypted: bool) -> Self {
101 self.flags.encrypted = encrypted;
102 self
103 }
104
105 pub fn with_compressed(mut self, compressed: bool) -> Self {
107 self.flags.compressed = compressed;
108 self
109 }
110
111 pub fn size(&self) -> usize {
113 let header = if self.flags.has_timestamp {
114 HEADER_SIZE_WITH_TS
115 } else {
116 HEADER_SIZE
117 };
118 header + self.payload.len()
119 }
120
121 pub fn encode(&self) -> Result<Bytes> {
123 if self.payload.len() > MAX_PAYLOAD_SIZE {
124 return Err(Error::PayloadTooLarge(self.payload.len()));
125 }
126
127 let mut buf = BytesMut::with_capacity(self.size());
128
129 buf.put_u8(MAGIC_BYTE);
131
132 buf.put_u8(self.flags.to_byte());
134
135 buf.put_u16(self.payload.len() as u16);
137
138 if let Some(ts) = self.timestamp {
140 buf.put_u64(ts);
141 }
142
143 buf.extend_from_slice(&self.payload);
145
146 Ok(buf.freeze())
147 }
148
149 pub fn decode(mut buf: impl Buf) -> Result<Self> {
151 if buf.remaining() < HEADER_SIZE {
152 return Err(Error::BufferTooSmall {
153 needed: HEADER_SIZE,
154 have: buf.remaining(),
155 });
156 }
157
158 let magic = buf.get_u8();
160 if magic != MAGIC_BYTE {
161 return Err(Error::InvalidMagic(magic));
162 }
163
164 let flags = FrameFlags::from_byte(buf.get_u8());
166
167 let payload_len = buf.get_u16() as usize;
169
170 let header_size = if flags.has_timestamp {
172 HEADER_SIZE_WITH_TS
173 } else {
174 HEADER_SIZE
175 };
176 let total_remaining = if flags.has_timestamp { 8 } else { 0 } + payload_len;
177
178 if buf.remaining() < total_remaining {
179 return Err(Error::BufferTooSmall {
180 needed: header_size + payload_len,
181 have: HEADER_SIZE + buf.remaining(),
182 });
183 }
184
185 let timestamp = if flags.has_timestamp {
187 Some(buf.get_u64())
188 } else {
189 None
190 };
191
192 let payload = buf.copy_to_bytes(payload_len);
194
195 Ok(Self {
196 flags,
197 timestamp,
198 payload,
199 })
200 }
201
202 pub fn check_complete(buf: &[u8]) -> Option<usize> {
204 if buf.len() < HEADER_SIZE {
205 return None;
206 }
207
208 if buf[0] != MAGIC_BYTE {
209 return None;
210 }
211
212 let flags = FrameFlags::from_byte(buf[1]);
213 let payload_len = u16::from_be_bytes([buf[2], buf[3]]) as usize;
214
215 let header_size = if flags.has_timestamp {
216 HEADER_SIZE_WITH_TS
217 } else {
218 HEADER_SIZE
219 };
220
221 let total_size = header_size + payload_len;
222
223 if buf.len() >= total_size {
224 Some(total_size)
225 } else {
226 None
227 }
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 #[test]
236 fn test_frame_encode_decode() {
237 let payload = b"hello world";
238 let frame = Frame::new(payload.as_slice())
239 .with_qos(QoS::Confirm)
240 .with_timestamp(1234567890);
241
242 let encoded = frame.encode().unwrap();
243 let decoded = Frame::decode(&encoded[..]).unwrap();
244
245 assert_eq!(decoded.flags.qos, QoS::Confirm);
246 assert_eq!(decoded.timestamp, Some(1234567890));
247 assert_eq!(decoded.payload.as_ref(), payload);
248 }
249
250 #[test]
251 fn test_flags_roundtrip() {
252 let flags = FrameFlags {
253 qos: QoS::Commit,
254 has_timestamp: true,
255 encrypted: true,
256 compressed: false,
257 };
258
259 let byte = flags.to_byte();
260 let decoded = FrameFlags::from_byte(byte);
261
262 assert_eq!(decoded.qos, QoS::Commit);
263 assert!(decoded.has_timestamp);
264 assert!(decoded.encrypted);
265 assert!(!decoded.compressed);
266 }
267
268 #[test]
269 fn test_check_complete() {
270 let frame = Frame::new(b"test".as_slice());
271 let encoded = frame.encode().unwrap();
272
273 assert_eq!(Frame::check_complete(&encoded), Some(encoded.len()));
275
276 assert_eq!(Frame::check_complete(&encoded[..2]), None);
278
279 assert_eq!(Frame::check_complete(&encoded[..5]), None);
281 }
282}