Skip to main content

dynamo_runtime/transports/event_plane/
frame.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Binary frame format for event transport
5//!
6//! - **Fixed 5-byte header**
7//! - **Versioned**: Protocol evolution support
8//! - **Payload length**: Enables proper frame boundary detection
9
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11use thiserror::Error;
12
13/// Frame protocol version
14pub const FRAME_VERSION: u8 = 1;
15
16/// Fixed header size in bytes
17pub const FRAME_HEADER_SIZE: usize = 5;
18
19/// Frame encoding/decoding errors
20#[derive(Debug, Error)]
21pub enum FrameError {
22    #[error("Incomplete frame header: expected {FRAME_HEADER_SIZE} bytes, got {0} bytes")]
23    IncompleteHeader(usize),
24
25    #[error("Incomplete frame payload: expected {expected} bytes, got {available} bytes")]
26    IncompletePayload { expected: usize, available: usize },
27
28    #[error("Unsupported protocol version: {0} (expected {FRAME_VERSION})")]
29    UnsupportedVersion(u8),
30
31    #[error("Frame too large: {0} bytes exceeds maximum")]
32    FrameTooLarge(usize),
33}
34
35/// Frame header (5 bytes fixed)
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct FrameHeader {
38    /// Protocol version (currently 1)
39    pub version: u8,
40    /// Payload length in bytes
41    pub payload_len: u32,
42}
43
44impl FrameHeader {
45    /// Encode header to bytes
46    pub fn encode(&self, buf: &mut BytesMut) {
47        buf.put_u8(self.version);
48        buf.put_u32(self.payload_len);
49    }
50
51    /// Decode header from bytes
52    pub fn decode(buf: &mut impl Buf) -> Result<Self, FrameError> {
53        if buf.remaining() < FRAME_HEADER_SIZE {
54            return Err(FrameError::IncompleteHeader(buf.remaining()));
55        }
56
57        let version = buf.get_u8();
58        if version != FRAME_VERSION {
59            return Err(FrameError::UnsupportedVersion(version));
60        }
61
62        let payload_len = buf.get_u32();
63
64        Ok(FrameHeader {
65            version,
66            payload_len,
67        })
68    }
69
70    /// Get total frame size (header + payload)
71    pub fn frame_size(&self) -> usize {
72        FRAME_HEADER_SIZE + self.payload_len as usize
73    }
74}
75
76/// Complete frame (header + payload)
77#[derive(Debug, Clone)]
78pub struct Frame {
79    pub header: FrameHeader,
80    pub payload: Bytes,
81}
82
83impl Frame {
84    pub fn new(payload: Bytes) -> Self {
85        Self {
86            header: FrameHeader {
87                version: FRAME_VERSION,
88                payload_len: payload.len() as u32,
89            },
90            payload,
91        }
92    }
93
94    /// Encode frame to wire format
95    pub fn encode(&self) -> Bytes {
96        let mut buf = BytesMut::with_capacity(self.header.frame_size());
97        self.header.encode(&mut buf);
98        buf.put(self.payload.clone());
99        buf.freeze()
100    }
101
102    /// Decode frame from wire format
103    pub fn decode(mut buf: impl Buf) -> Result<Self, FrameError> {
104        let header = FrameHeader::decode(&mut buf)?;
105
106        let payload_len = header.payload_len as usize;
107        if buf.remaining() < payload_len {
108            return Err(FrameError::IncompletePayload {
109                expected: payload_len,
110                available: buf.remaining(),
111            });
112        }
113
114        let payload = buf.copy_to_bytes(payload_len);
115
116        Ok(Frame { header, payload })
117    }
118
119    pub fn size(&self) -> usize {
120        self.header.frame_size()
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn test_frame_header_encode_decode() {
130        let header = FrameHeader {
131            version: FRAME_VERSION,
132            payload_len: 1024,
133        };
134
135        let mut buf = BytesMut::new();
136        header.encode(&mut buf);
137
138        assert_eq!(buf.len(), FRAME_HEADER_SIZE);
139
140        let decoded = FrameHeader::decode(&mut buf).unwrap();
141        assert_eq!(decoded.version, header.version);
142        assert_eq!(decoded.payload_len, header.payload_len);
143    }
144
145    #[test]
146    fn test_frame_encode_decode_roundtrip() {
147        let payload = Bytes::from("hello world");
148        let frame = Frame::new(payload.clone());
149
150        let encoded = frame.encode();
151        let decoded = Frame::decode(encoded).unwrap();
152
153        assert_eq!(decoded.header.version, FRAME_VERSION);
154        assert_eq!(decoded.payload, payload);
155    }
156
157    #[test]
158    fn test_frame_error_incomplete_header() {
159        let buf = Bytes::from(vec![1, 2, 3]); // Only 3 bytes
160        let result = Frame::decode(buf);
161        assert!(matches!(result, Err(FrameError::IncompleteHeader(3))));
162    }
163
164    #[test]
165    fn test_frame_error_incomplete_payload() {
166        let mut buf = BytesMut::new();
167        let header = FrameHeader {
168            version: FRAME_VERSION,
169            payload_len: 1000, // Claims 1000 bytes
170        };
171        header.encode(&mut buf);
172        buf.put_slice(b"short"); // Only 5 bytes provided
173
174        let result = Frame::decode(buf.freeze());
175        assert!(matches!(
176            result,
177            Err(FrameError::IncompletePayload {
178                expected: 1000,
179                available: 5
180            })
181        ));
182    }
183
184    #[test]
185    fn test_frame_error_unsupported_version() {
186        let mut buf = BytesMut::new();
187        buf.put_u8(99); // Invalid version
188        buf.put_u32(0); // payload_len
189
190        let result = FrameHeader::decode(&mut buf);
191        assert!(matches!(result, Err(FrameError::UnsupportedVersion(99))));
192    }
193
194    #[test]
195    fn test_zero_length_payload() {
196        let payload = Bytes::new();
197        let frame = Frame::new(payload.clone());
198
199        let encoded = frame.encode();
200        assert_eq!(encoded.len(), FRAME_HEADER_SIZE);
201
202        let decoded = Frame::decode(encoded).unwrap();
203        assert_eq!(decoded.payload.len(), 0);
204    }
205}