clasp_core/
frame.rs

1//! Binary frame encoding/decoding
2//!
3//! Clasp frame format:
4//! ```text
5//! ┌─────────────────────────────────────────────────────────────────┐
6//! │ Byte 0:     Magic (0x53 = 'S')                                  │
7//! │ Byte 1:     Flags                                               │
8//! │             [7:6] QoS (00=fire, 01=confirm, 10=commit, 11=rsv)  │
9//! │             [5]   Timestamp present                             │
10//! │             [4]   Encrypted                                     │
11//! │             [3]   Compressed                                    │
12//! │             [2:0] Reserved                                      │
13//! │ Byte 2-3:   Payload Length (uint16 big-endian, max 65535)       │
14//! ├─────────────────────────────────────────────────────────────────┤
15//! │ [If timestamp flag] Bytes 4-11: Timestamp (uint64 µs)           │
16//! ├─────────────────────────────────────────────────────────────────┤
17//! │ Payload (MessagePack encoded)                                   │
18//! └─────────────────────────────────────────────────────────────────┘
19//! ```
20
21use crate::{Error, QoS, Result, MAGIC_BYTE};
22use bytes::{Buf, BufMut, Bytes, BytesMut};
23
24/// Frame header size without timestamp
25pub const HEADER_SIZE: usize = 4;
26
27/// Frame header size with timestamp
28pub const HEADER_SIZE_WITH_TS: usize = 12;
29
30/// Maximum payload size
31pub const MAX_PAYLOAD_SIZE: usize = 65535;
32
33/// Frame flags
34#[derive(Debug, Clone, Copy, Default)]
35pub struct FrameFlags {
36    pub qos: QoS,
37    pub has_timestamp: bool,
38    pub encrypted: bool,
39    pub compressed: bool,
40}
41
42impl FrameFlags {
43    pub fn to_byte(&self) -> u8 {
44        let mut flags = 0u8;
45        flags |= (self.qos as u8) << 6;
46        if self.has_timestamp {
47            flags |= 0x20;
48        }
49        if self.encrypted {
50            flags |= 0x10;
51        }
52        if self.compressed {
53            flags |= 0x08;
54        }
55        flags
56    }
57
58    pub fn from_byte(byte: u8) -> Self {
59        Self {
60            qos: QoS::from_u8((byte >> 6) & 0x03).unwrap_or(QoS::Fire),
61            has_timestamp: (byte & 0x20) != 0,
62            encrypted: (byte & 0x10) != 0,
63            compressed: (byte & 0x08) != 0,
64        }
65    }
66}
67
68/// A Clasp frame
69#[derive(Debug, Clone)]
70pub struct Frame {
71    pub flags: FrameFlags,
72    pub timestamp: Option<u64>,
73    pub payload: Bytes,
74}
75
76impl Frame {
77    /// Create a new frame with payload
78    pub fn new(payload: impl Into<Bytes>) -> Self {
79        Self {
80            flags: FrameFlags::default(),
81            timestamp: None,
82            payload: payload.into(),
83        }
84    }
85
86    /// Create a frame with QoS
87    pub fn with_qos(mut self, qos: QoS) -> Self {
88        self.flags.qos = qos;
89        self
90    }
91
92    /// Create a frame with timestamp
93    pub fn with_timestamp(mut self, timestamp: u64) -> Self {
94        self.timestamp = Some(timestamp);
95        self.flags.has_timestamp = true;
96        self
97    }
98
99    /// Create a frame with encryption flag
100    pub fn with_encrypted(mut self, encrypted: bool) -> Self {
101        self.flags.encrypted = encrypted;
102        self
103    }
104
105    /// Create a frame with compression flag
106    pub fn with_compressed(mut self, compressed: bool) -> Self {
107        self.flags.compressed = compressed;
108        self
109    }
110
111    /// Calculate the total frame size
112    pub fn size(&self) -> usize {
113        let header = if self.flags.has_timestamp {
114            HEADER_SIZE_WITH_TS
115        } else {
116            HEADER_SIZE
117        };
118        header + self.payload.len()
119    }
120
121    /// Encode frame to bytes
122    pub fn encode(&self) -> Result<Bytes> {
123        if self.payload.len() > MAX_PAYLOAD_SIZE {
124            return Err(Error::PayloadTooLarge(self.payload.len()));
125        }
126
127        let mut buf = BytesMut::with_capacity(self.size());
128
129        // Magic byte
130        buf.put_u8(MAGIC_BYTE);
131
132        // Flags
133        buf.put_u8(self.flags.to_byte());
134
135        // Payload length
136        buf.put_u16(self.payload.len() as u16);
137
138        // Timestamp (if present)
139        if let Some(ts) = self.timestamp {
140            buf.put_u64(ts);
141        }
142
143        // Payload
144        buf.extend_from_slice(&self.payload);
145
146        Ok(buf.freeze())
147    }
148
149    /// Decode frame from bytes
150    pub fn decode(mut buf: impl Buf) -> Result<Self> {
151        if buf.remaining() < HEADER_SIZE {
152            return Err(Error::BufferTooSmall {
153                needed: HEADER_SIZE,
154                have: buf.remaining(),
155            });
156        }
157
158        // Magic byte
159        let magic = buf.get_u8();
160        if magic != MAGIC_BYTE {
161            return Err(Error::InvalidMagic(magic));
162        }
163
164        // Flags
165        let flags = FrameFlags::from_byte(buf.get_u8());
166
167        // Payload length
168        let payload_len = buf.get_u16() as usize;
169
170        // Calculate required size
171        let header_size = if flags.has_timestamp {
172            HEADER_SIZE_WITH_TS
173        } else {
174            HEADER_SIZE
175        };
176        let total_remaining = if flags.has_timestamp { 8 } else { 0 } + payload_len;
177
178        if buf.remaining() < total_remaining {
179            return Err(Error::BufferTooSmall {
180                needed: header_size + payload_len,
181                have: HEADER_SIZE + buf.remaining(),
182            });
183        }
184
185        // Timestamp
186        let timestamp = if flags.has_timestamp {
187            Some(buf.get_u64())
188        } else {
189            None
190        };
191
192        // Payload
193        let payload = buf.copy_to_bytes(payload_len);
194
195        Ok(Self {
196            flags,
197            timestamp,
198            payload,
199        })
200    }
201
202    /// Check if buffer contains a complete frame
203    pub fn check_complete(buf: &[u8]) -> Option<usize> {
204        if buf.len() < HEADER_SIZE {
205            return None;
206        }
207
208        if buf[0] != MAGIC_BYTE {
209            return None;
210        }
211
212        let flags = FrameFlags::from_byte(buf[1]);
213        let payload_len = u16::from_be_bytes([buf[2], buf[3]]) as usize;
214
215        let header_size = if flags.has_timestamp {
216            HEADER_SIZE_WITH_TS
217        } else {
218            HEADER_SIZE
219        };
220
221        let total_size = header_size + payload_len;
222
223        if buf.len() >= total_size {
224            Some(total_size)
225        } else {
226            None
227        }
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    #[test]
236    fn test_frame_encode_decode() {
237        let payload = b"hello world";
238        let frame = Frame::new(payload.as_slice())
239            .with_qos(QoS::Confirm)
240            .with_timestamp(1234567890);
241
242        let encoded = frame.encode().unwrap();
243        let decoded = Frame::decode(&encoded[..]).unwrap();
244
245        assert_eq!(decoded.flags.qos, QoS::Confirm);
246        assert_eq!(decoded.timestamp, Some(1234567890));
247        assert_eq!(decoded.payload.as_ref(), payload);
248    }
249
250    #[test]
251    fn test_flags_roundtrip() {
252        let flags = FrameFlags {
253            qos: QoS::Commit,
254            has_timestamp: true,
255            encrypted: true,
256            compressed: false,
257        };
258
259        let byte = flags.to_byte();
260        let decoded = FrameFlags::from_byte(byte);
261
262        assert_eq!(decoded.qos, QoS::Commit);
263        assert!(decoded.has_timestamp);
264        assert!(decoded.encrypted);
265        assert!(!decoded.compressed);
266    }
267
268    #[test]
269    fn test_check_complete() {
270        let frame = Frame::new(b"test".as_slice());
271        let encoded = frame.encode().unwrap();
272
273        // Complete frame
274        assert_eq!(Frame::check_complete(&encoded), Some(encoded.len()));
275
276        // Incomplete header
277        assert_eq!(Frame::check_complete(&encoded[..2]), None);
278
279        // Incomplete payload
280        assert_eq!(Frame::check_complete(&encoded[..5]), None);
281    }
282}