Skip to main content

procwire_client/protocol/
wire_format.rs

1//! Wire format encoding and decoding.
2//!
3//! Implements the 11-byte header format:
4//! ```text
5//! ┌──────────┬───────┬──────────┬──────────┐
6//! │ Method ID│ Flags │ Req ID   │ Length   │
7//! │ 2 bytes  │ 1 byte│ 4 bytes  │ 4 bytes  │
8//! │ uint16 BE│       │ uint32 BE│ uint32 BE│
9//! └──────────┴───────┴──────────┴──────────┘
10//! ```
11//!
12//! All multi-byte integers are Big Endian.
13
14use crate::error::{ProcwireError, Result};
15
16/// Header size in bytes (fixed, exactly 11).
17pub const HEADER_SIZE: usize = 11;
18
19/// Default maximum payload size (1 GB).
20pub const DEFAULT_MAX_PAYLOAD_SIZE: u32 = 1_073_741_824;
21
22/// Absolute maximum payload size (~2 GB, max i32).
23pub const ABSOLUTE_MAX_PAYLOAD_SIZE: u32 = 2_147_483_647;
24
25/// Reserved method ID for ABORT signal.
26pub const ABORT_METHOD_ID: u16 = 0xFFFF;
27
28/// Reserved method ID (never use).
29pub const RESERVED_METHOD_ID: u16 = 0;
30
31/// Header pool size for ring buffer optimization.
32pub const HEADER_POOL_SIZE: usize = 16;
33
34/// Flag constants for the protocol.
35pub mod flags {
36    /// Direction: to parent (1) or to child (0).
37    pub const DIRECTION_TO_PARENT: u8 = 0b0000_0001;
38    /// Message type: response (1) or request/event (0).
39    pub const IS_RESPONSE: u8 = 0b0000_0010;
40    /// Error flag: error (1) or ok (0).
41    pub const IS_ERROR: u8 = 0b0000_0100;
42    /// Stream flag: stream chunk (1) or single message (0).
43    pub const IS_STREAM: u8 = 0b0000_1000;
44    /// Stream end flag: final chunk (1) or more coming (0).
45    pub const STREAM_END: u8 = 0b0001_0000;
46    /// ACK flag: acknowledgment only (1) or full response (0).
47    pub const IS_ACK: u8 = 0b0010_0000;
48
49    /// Reserved bits mask (bits 6-7).
50    pub const RESERVED_MASK: u8 = 0b1100_0000;
51
52    /// Check if a specific flag is set.
53    #[inline]
54    pub fn has_flag(flags: u8, flag: u8) -> bool {
55        flags & flag != 0
56    }
57
58    // Common flag combinations for responses
59
60    /// Response flags: to_parent + is_response = 0x03
61    pub const RESPONSE: u8 = DIRECTION_TO_PARENT | IS_RESPONSE;
62    /// Error response flags: to_parent + is_response + is_error = 0x07
63    pub const ERROR_RESPONSE: u8 = DIRECTION_TO_PARENT | IS_RESPONSE | IS_ERROR;
64    /// Stream chunk flags: to_parent + is_response + is_stream = 0x0B
65    pub const STREAM_CHUNK: u8 = DIRECTION_TO_PARENT | IS_RESPONSE | IS_STREAM;
66    /// Stream end flags: to_parent + is_response + is_stream + stream_end = 0x1B
67    pub const STREAM_END_RESPONSE: u8 = DIRECTION_TO_PARENT | IS_RESPONSE | IS_STREAM | STREAM_END;
68    /// ACK response flags: to_parent + is_response + is_ack = 0x23
69    pub const ACK_RESPONSE: u8 = DIRECTION_TO_PARENT | IS_RESPONSE | IS_ACK;
70}
71
72/// Decoded header from wire format.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub struct Header {
75    /// Method identifier (1-65534, 0 reserved, 0xFFFF = abort).
76    pub method_id: u16,
77    /// Flags byte (see `flags` module).
78    pub flags: u8,
79    /// Request identifier (0 = event/fire-and-forget).
80    pub request_id: u32,
81    /// Payload length in bytes.
82    pub payload_length: u32,
83}
84
85impl Header {
86    /// Create a new header.
87    pub fn new(method_id: u16, flags: u8, request_id: u32, payload_length: u32) -> Self {
88        Self {
89            method_id,
90            flags,
91            request_id,
92            payload_length,
93        }
94    }
95
96    /// Encode header to bytes (Big Endian).
97    ///
98    /// # Example
99    ///
100    /// ```
101    /// use procwire_client::protocol::{Header, flags};
102    ///
103    /// let header = Header::new(1, flags::RESPONSE, 42, 100);
104    /// let bytes = header.encode();
105    /// assert_eq!(bytes.len(), 11);
106    /// ```
107    pub fn encode(&self) -> [u8; HEADER_SIZE] {
108        let mut buf = [0u8; HEADER_SIZE];
109        self.encode_into(&mut buf);
110        buf
111    }
112
113    /// Encode header into an existing buffer.
114    ///
115    /// # Panics
116    ///
117    /// Panics if buffer is smaller than `HEADER_SIZE` (11 bytes).
118    pub fn encode_into(&self, buf: &mut [u8]) {
119        debug_assert!(buf.len() >= HEADER_SIZE);
120        buf[0..2].copy_from_slice(&self.method_id.to_be_bytes());
121        buf[2] = self.flags;
122        buf[3..7].copy_from_slice(&self.request_id.to_be_bytes());
123        buf[7..11].copy_from_slice(&self.payload_length.to_be_bytes());
124    }
125
126    /// Decode header from bytes (Big Endian).
127    ///
128    /// Returns `None` if buffer is too short.
129    ///
130    /// # Example
131    ///
132    /// ```
133    /// use procwire_client::protocol::Header;
134    ///
135    /// let bytes = [0, 1, 0x03, 0, 0, 0, 42, 0, 0, 0, 100];
136    /// let header = Header::decode(&bytes).unwrap();
137    /// assert_eq!(header.method_id, 1);
138    /// assert_eq!(header.request_id, 42);
139    /// assert_eq!(header.payload_length, 100);
140    /// ```
141    pub fn decode(buf: &[u8]) -> Option<Self> {
142        if buf.len() < HEADER_SIZE {
143            return None;
144        }
145        Some(Self {
146            method_id: u16::from_be_bytes([buf[0], buf[1]]),
147            flags: buf[2],
148            request_id: u32::from_be_bytes([buf[3], buf[4], buf[5], buf[6]]),
149            payload_length: u32::from_be_bytes([buf[7], buf[8], buf[9], buf[10]]),
150        })
151    }
152
153    /// Validate the header for protocol compliance.
154    ///
155    /// Checks:
156    /// - Method ID is not 0 (reserved)
157    /// - Payload length doesn't exceed max
158    /// - Reserved flag bits are 0
159    pub fn validate(&self, max_payload_size: u32) -> Result<()> {
160        if self.method_id == RESERVED_METHOD_ID {
161            return Err(ProcwireError::Protocol(
162                "Method ID 0 is reserved".to_string(),
163            ));
164        }
165
166        if self.payload_length > max_payload_size {
167            return Err(ProcwireError::Protocol(format!(
168                "Payload size {} exceeds maximum {}",
169                self.payload_length, max_payload_size
170            )));
171        }
172
173        if self.flags & flags::RESERVED_MASK != 0 {
174            return Err(ProcwireError::Protocol(
175                "Reserved flag bits must be 0".to_string(),
176            ));
177        }
178
179        Ok(())
180    }
181
182    /// Check if this is an abort signal.
183    #[inline]
184    pub fn is_abort(&self) -> bool {
185        self.method_id == ABORT_METHOD_ID
186    }
187
188    /// Check if this is a response.
189    #[inline]
190    pub fn is_response(&self) -> bool {
191        flags::has_flag(self.flags, flags::IS_RESPONSE)
192    }
193
194    /// Check if this is an error response.
195    #[inline]
196    pub fn is_error(&self) -> bool {
197        flags::has_flag(self.flags, flags::IS_ERROR)
198    }
199
200    /// Check if this is a stream chunk.
201    #[inline]
202    pub fn is_stream(&self) -> bool {
203        flags::has_flag(self.flags, flags::IS_STREAM)
204    }
205
206    /// Check if this is the final stream chunk.
207    #[inline]
208    pub fn is_stream_end(&self) -> bool {
209        flags::has_flag(self.flags, flags::STREAM_END)
210    }
211
212    /// Check if this is an ACK.
213    #[inline]
214    pub fn is_ack(&self) -> bool {
215        flags::has_flag(self.flags, flags::IS_ACK)
216    }
217
218    /// Check if direction is to parent.
219    #[inline]
220    pub fn is_to_parent(&self) -> bool {
221        flags::has_flag(self.flags, flags::DIRECTION_TO_PARENT)
222    }
223
224    /// Check if this is an event (request_id == 0).
225    #[inline]
226    pub fn is_event(&self) -> bool {
227        self.request_id == 0
228    }
229}
230
231/// Encode a header to bytes (standalone function).
232#[inline]
233pub fn encode_header(header: &Header) -> [u8; HEADER_SIZE] {
234    header.encode()
235}
236
237/// Encode a header into an existing buffer (standalone function).
238#[inline]
239pub fn encode_header_into(buf: &mut [u8], header: &Header) {
240    header.encode_into(buf);
241}
242
243/// Decode a header from bytes (standalone function).
244#[inline]
245pub fn decode_header(buf: &[u8]) -> Option<Header> {
246    Header::decode(buf)
247}
248
249/// Validate a header for protocol compliance (standalone function).
250#[inline]
251pub fn validate_header(header: &Header, max_payload_size: u32) -> Result<()> {
252    header.validate(max_payload_size)
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn test_header_encode_decode_roundtrip() {
261        let original = Header::new(1, flags::RESPONSE, 42, 100);
262        let encoded = original.encode();
263        let decoded = Header::decode(&encoded).unwrap();
264        assert_eq!(original, decoded);
265    }
266
267    #[test]
268    fn test_header_big_endian_byte_order() {
269        let header = Header::new(0x0102, 0x03, 0x04050607, 0x08090A0B);
270        let bytes = header.encode();
271
272        // Method ID: 0x0102 in BE
273        assert_eq!(bytes[0], 0x01);
274        assert_eq!(bytes[1], 0x02);
275
276        // Flags: 0x03
277        assert_eq!(bytes[2], 0x03);
278
279        // Request ID: 0x04050607 in BE
280        assert_eq!(bytes[3], 0x04);
281        assert_eq!(bytes[4], 0x05);
282        assert_eq!(bytes[5], 0x06);
283        assert_eq!(bytes[6], 0x07);
284
285        // Payload length: 0x08090A0B in BE
286        assert_eq!(bytes[7], 0x08);
287        assert_eq!(bytes[8], 0x09);
288        assert_eq!(bytes[9], 0x0A);
289        assert_eq!(bytes[10], 0x0B);
290    }
291
292    #[test]
293    fn test_header_size_is_exactly_11() {
294        assert_eq!(HEADER_SIZE, 11);
295        let header = Header::new(1, 0, 1, 0);
296        assert_eq!(header.encode().len(), 11);
297    }
298
299    #[test]
300    fn test_decode_too_short_buffer() {
301        let buf = [0u8; 10]; // One byte short
302        assert!(Header::decode(&buf).is_none());
303    }
304
305    #[test]
306    fn test_validate_method_id_zero_rejected() {
307        let header = Header::new(0, 0, 1, 0);
308        let result = header.validate(DEFAULT_MAX_PAYLOAD_SIZE);
309        assert!(result.is_err());
310        assert!(result
311            .unwrap_err()
312            .to_string()
313            .contains("Method ID 0 is reserved"));
314    }
315
316    #[test]
317    fn test_validate_method_id_abort_allowed() {
318        let header = Header::new(ABORT_METHOD_ID, 0, 0, 0);
319        let result = header.validate(DEFAULT_MAX_PAYLOAD_SIZE);
320        assert!(result.is_ok());
321        assert!(header.is_abort());
322    }
323
324    #[test]
325    fn test_validate_payload_too_large() {
326        let header = Header::new(1, 0, 1, 1_000_000);
327        let result = header.validate(100); // Max 100 bytes
328        assert!(result.is_err());
329        assert!(result.unwrap_err().to_string().contains("exceeds maximum"));
330    }
331
332    #[test]
333    fn test_validate_reserved_bits_must_be_zero() {
334        let header = Header::new(1, 0b1000_0000, 1, 0); // Bit 7 set
335        let result = header.validate(DEFAULT_MAX_PAYLOAD_SIZE);
336        assert!(result.is_err());
337        assert!(result
338            .unwrap_err()
339            .to_string()
340            .contains("Reserved flag bits"));
341    }
342
343    #[test]
344    fn test_flags_has_flag() {
345        assert!(flags::has_flag(flags::RESPONSE, flags::DIRECTION_TO_PARENT));
346        assert!(flags::has_flag(flags::RESPONSE, flags::IS_RESPONSE));
347        assert!(!flags::has_flag(flags::RESPONSE, flags::IS_ERROR));
348    }
349
350    #[test]
351    fn test_flag_combinations() {
352        // Response: 0x03 = to_parent + is_response
353        assert_eq!(flags::RESPONSE, 0x03);
354
355        // Error response: 0x07 = to_parent + is_response + is_error
356        assert_eq!(flags::ERROR_RESPONSE, 0x07);
357
358        // Stream chunk: 0x0B = to_parent + is_response + is_stream
359        assert_eq!(flags::STREAM_CHUNK, 0x0B);
360
361        // Stream end: 0x1B = to_parent + is_response + is_stream + stream_end
362        assert_eq!(flags::STREAM_END_RESPONSE, 0x1B);
363
364        // ACK: 0x23 = to_parent + is_response + is_ack
365        assert_eq!(flags::ACK_RESPONSE, 0x23);
366    }
367
368    #[test]
369    fn test_header_accessors() {
370        let header = Header::new(1, flags::STREAM_END_RESPONSE, 0, 0);
371
372        assert!(header.is_response());
373        assert!(header.is_stream());
374        assert!(header.is_stream_end());
375        assert!(header.is_to_parent());
376        assert!(header.is_event()); // request_id == 0
377        assert!(!header.is_error());
378        assert!(!header.is_ack());
379        assert!(!header.is_abort());
380    }
381
382    #[test]
383    fn test_min_max_values() {
384        // Minimum valid method ID
385        let min_header = Header::new(1, 0, 0, 0);
386        assert!(min_header.validate(DEFAULT_MAX_PAYLOAD_SIZE).is_ok());
387
388        // Maximum method ID before abort
389        let max_header = Header::new(0xFFFE, 0, u32::MAX, u32::MAX);
390        assert!(max_header.validate(u32::MAX).is_ok());
391    }
392
393    #[test]
394    fn test_encode_into() {
395        let header = Header::new(1, flags::RESPONSE, 42, 100);
396        let mut buf = [0u8; HEADER_SIZE];
397        header.encode_into(&mut buf);
398
399        let decoded = Header::decode(&buf).unwrap();
400        assert_eq!(header, decoded);
401    }
402
403    #[test]
404    fn test_standalone_functions() {
405        let header = Header::new(1, 0, 1, 0);
406
407        let encoded = encode_header(&header);
408        let decoded = decode_header(&encoded).unwrap();
409        assert_eq!(header, decoded);
410
411        let mut buf = [0u8; HEADER_SIZE];
412        encode_header_into(&mut buf, &header);
413        assert_eq!(buf, encoded);
414
415        assert!(validate_header(&header, DEFAULT_MAX_PAYLOAD_SIZE).is_ok());
416    }
417}