Skip to main content

ferrotunnel_protocol/
frame.rs

1//! Protocol frame definitions
2
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use uuid::Uuid;
7
8/// Stream priority (0 = lowest, 3 = highest). Used for QoS scheduling (e.g. control vs bulk).
9/// Ord is for send scheduling: Critical is sent first, then High, Normal, Low.
10#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "lowercase")]
12pub enum StreamPriority {
13    #[default]
14    /// Default / best-effort
15    Normal = 0,
16    /// Low priority (bulk data)
17    Low = 1,
18    /// High priority (control, latency-sensitive)
19    High = 2,
20    /// Highest (e.g. heartbeats, critical control)
21    Critical = 3,
22}
23
24impl StreamPriority {
25    pub const fn as_u8(self) -> u8 {
26        self as u8
27    }
28
29    /// Drain order for the batched sender: lower value = sent first (Critical first, then High, Normal, Low).
30    pub const fn drain_order(self) -> u8 {
31        match self {
32            Self::Critical => 0,
33            Self::High => 1,
34            Self::Normal => 2,
35            Self::Low => 3,
36        }
37    }
38}
39
40impl PartialOrd for StreamPriority {
41    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
42        Some(self.cmp(other))
43    }
44}
45
46impl Ord for StreamPriority {
47    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48        self.drain_order().cmp(&other.drain_order())
49    }
50}
51
52/// Stream open request payload - boxed to reduce Frame enum size
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54pub struct OpenStreamFrame {
55    pub stream_id: u32,
56    pub protocol: Protocol,
57    pub headers: Vec<(String, String)>,
58    pub body_hint: Option<u64>,
59    /// Optional priority for scheduling (default Normal).
60    #[serde(default)]
61    pub priority: StreamPriority,
62}
63
64/// Handshake payload - boxed to reduce Frame enum size
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub struct HandshakeFrame {
67    pub token: String,
68    pub tunnel_id: Option<String>,
69    /// Minimum protocol version supported by this peer
70    pub min_version: u8,
71    /// Maximum protocol version supported by this peer
72    pub max_version: u8,
73    pub capabilities: Vec<String>,
74}
75
76/// Wire protocol frame
77///
78/// Large variants are boxed to keep stack size small for control frames.
79/// This provides ~60% stack reduction for small frames like Heartbeat.
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
81pub enum Frame {
82    /// Initial handshake from client to server
83    Handshake(Box<HandshakeFrame>),
84
85    /// Handshake acknowledgment from server
86    HandshakeAck {
87        session_id: Uuid,
88        status: HandshakeStatus,
89        /// Negotiated protocol version
90        version: u8,
91        server_capabilities: Vec<String>,
92    },
93
94    /// Register a service
95    Register {
96        service_name: String,
97        protocol: Protocol,
98        metadata: HashMap<String, String>,
99    },
100
101    /// Registration response
102    RegisterAck {
103        public_url: String,
104        status: RegisterStatus,
105    },
106
107    /// Open a new stream
108    OpenStream(Box<OpenStreamFrame>),
109
110    /// Stream acknowledgment
111    StreamAck {
112        stream_id: u32,
113        status: StreamStatus,
114    },
115
116    /// Data frame (Fast Path - no Box)
117    Data {
118        stream_id: u32,
119        data: Bytes,
120        end_of_stream: bool,
121    },
122
123    /// Close a stream
124    CloseStream { stream_id: u32, reason: CloseReason },
125
126    /// Heartbeat ping
127    Heartbeat { timestamp: u64 },
128
129    /// Heartbeat acknowledgment
130    HeartbeatAck { timestamp: u64 },
131
132    /// Error frame
133    Error {
134        stream_id: Option<u32>,
135        code: ErrorCode,
136        message: String,
137    },
138
139    /// Plugin data (for future use)
140    PluginData { plugin_id: String, data: Bytes },
141}
142
143/// Handshake status codes
144#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
145pub enum HandshakeStatus {
146    Success,
147    InvalidToken,
148    UnsupportedVersion,
149    VersionMismatch,
150    RateLimited,
151    TunnelIdTaken,
152}
153
154/// Registration status codes
155#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
156pub enum RegisterStatus {
157    Success,
158    ServiceNameTaken,
159    InvalidServiceName,
160    QuotaExceeded,
161}
162
163/// Stream status codes
164#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
165pub enum StreamStatus {
166    Accepted,
167    Rejected,
168    BackpressureApplied,
169}
170
171/// Supported protocols
172#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
173pub enum Protocol {
174    HTTP,
175    HTTPS,
176    HTTP2,
177    WebSocket,
178    GRPC,
179    TCP,
180}
181
182/// Stream close reasons
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
184pub enum CloseReason {
185    Normal,
186    Timeout,
187    Error(String),
188    LocalServiceUnreachable,
189    ProtocolViolation,
190}
191
192/// Zero-copy view of a data frame (borrows from parse buffer).
193/// Use in batch decode paths; convert to [`Frame`] with [`ZeroCopyFrame::to_owned`] when crossing task boundaries.
194#[derive(Debug, Clone, Copy)]
195pub enum ZeroCopyFrame<'a> {
196    Data {
197        stream_id: u32,
198        data: &'a [u8],
199        fin: bool,
200    },
201}
202
203impl ZeroCopyFrame<'_> {
204    /// Convert to owned [`Frame`] (copies data into [`bytes::Bytes`]).
205    #[inline]
206    pub fn to_owned(self) -> Frame {
207        match self {
208            ZeroCopyFrame::Data {
209                stream_id,
210                data,
211                fin,
212            } => Frame::Data {
213                stream_id,
214                data: Bytes::copy_from_slice(data),
215                end_of_stream: fin,
216            },
217        }
218    }
219}
220
221/// Error codes
222#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
223pub enum ErrorCode {
224    ProtocolError = 1,
225    AuthenticationFailed = 2,
226    SessionNotFound = 3,
227    StreamNotFound = 4,
228    Timeout = 5,
229    InternalServerError = 6,
230    ServiceUnavailable = 7,
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_frame_serialization() {
239        let frame = Frame::Handshake(Box::new(HandshakeFrame {
240            token: "test-token".to_string(),
241            tunnel_id: Some("test-tunnel".to_string()),
242            min_version: 1,
243            max_version: 1,
244            capabilities: vec!["http".to_string()],
245        }));
246
247        let config = bincode_next::config::standard();
248        let encoded = bincode_next::serde::encode_to_vec(&frame, config).unwrap();
249        let (decoded, _): (Frame, usize) =
250            bincode_next::serde::decode_from_slice(&encoded, config).unwrap();
251
252        assert_eq!(frame, decoded);
253    }
254
255    #[test]
256    fn test_data_frame_with_bytes() {
257        let data = Bytes::from("hello world");
258        let frame = Frame::Data {
259            stream_id: 42,
260            data: data.clone(),
261            end_of_stream: false,
262        };
263
264        let config = bincode_next::config::standard();
265        let encoded = bincode_next::serde::encode_to_vec(&frame, config).unwrap();
266        let (decoded, _): (Frame, usize) =
267            bincode_next::serde::decode_from_slice(&encoded, config).unwrap();
268
269        if let Frame::Data {
270            data: decoded_data, ..
271        } = decoded
272        {
273            assert_eq!(data, decoded_data);
274        } else {
275            panic!("Expected Data frame");
276        }
277    }
278
279    #[test]
280    fn test_all_frame_types() {
281        let frames = vec![
282            Frame::Heartbeat { timestamp: 123_456 },
283            Frame::HeartbeatAck { timestamp: 123_456 },
284            Frame::CloseStream {
285                stream_id: 1,
286                reason: CloseReason::Normal,
287            },
288            Frame::Error {
289                stream_id: Some(1),
290                code: ErrorCode::ProtocolError,
291                message: "test error".to_string(),
292            },
293        ];
294
295        for frame in frames {
296            let config = bincode_next::config::standard();
297            let encoded = bincode_next::serde::encode_to_vec(&frame, config).unwrap();
298            let (_decoded, _): (Frame, usize) =
299                bincode_next::serde::decode_from_slice(&encoded, config).unwrap();
300        }
301    }
302}