1use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use uuid::Uuid;
7
8#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "lowercase")]
12pub enum StreamPriority {
13 #[default]
14 Normal = 0,
16 Low = 1,
18 High = 2,
20 Critical = 3,
22}
23
24impl StreamPriority {
25 pub const fn as_u8(self) -> u8 {
26 self as u8
27 }
28
29 pub const fn drain_order(self) -> u8 {
31 match self {
32 Self::Critical => 0,
33 Self::High => 1,
34 Self::Normal => 2,
35 Self::Low => 3,
36 }
37 }
38}
39
40impl PartialOrd for StreamPriority {
41 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
42 Some(self.cmp(other))
43 }
44}
45
46impl Ord for StreamPriority {
47 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48 self.drain_order().cmp(&other.drain_order())
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
54pub struct OpenStreamFrame {
55 pub stream_id: u32,
56 pub protocol: Protocol,
57 pub headers: Vec<(String, String)>,
58 pub body_hint: Option<u64>,
59 #[serde(default)]
61 pub priority: StreamPriority,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub struct HandshakeFrame {
67 pub token: String,
68 pub tunnel_id: Option<String>,
69 pub min_version: u8,
71 pub max_version: u8,
73 pub capabilities: Vec<String>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
81pub enum Frame {
82 Handshake(Box<HandshakeFrame>),
84
85 HandshakeAck {
87 session_id: Uuid,
88 status: HandshakeStatus,
89 version: u8,
91 server_capabilities: Vec<String>,
92 },
93
94 Register {
96 service_name: String,
97 protocol: Protocol,
98 metadata: HashMap<String, String>,
99 },
100
101 RegisterAck {
103 public_url: String,
104 status: RegisterStatus,
105 },
106
107 OpenStream(Box<OpenStreamFrame>),
109
110 StreamAck {
112 stream_id: u32,
113 status: StreamStatus,
114 },
115
116 Data {
118 stream_id: u32,
119 data: Bytes,
120 end_of_stream: bool,
121 },
122
123 CloseStream { stream_id: u32, reason: CloseReason },
125
126 Heartbeat { timestamp: u64 },
128
129 HeartbeatAck { timestamp: u64 },
131
132 Error {
134 stream_id: Option<u32>,
135 code: ErrorCode,
136 message: String,
137 },
138
139 PluginData { plugin_id: String, data: Bytes },
141}
142
143#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
145pub enum HandshakeStatus {
146 Success,
147 InvalidToken,
148 UnsupportedVersion,
149 VersionMismatch,
150 RateLimited,
151 TunnelIdTaken,
152}
153
154#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
156pub enum RegisterStatus {
157 Success,
158 ServiceNameTaken,
159 InvalidServiceName,
160 QuotaExceeded,
161}
162
163#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
165pub enum StreamStatus {
166 Accepted,
167 Rejected,
168 BackpressureApplied,
169}
170
171#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
173pub enum Protocol {
174 HTTP,
175 HTTPS,
176 HTTP2,
177 WebSocket,
178 GRPC,
179 TCP,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
184pub enum CloseReason {
185 Normal,
186 Timeout,
187 Error(String),
188 LocalServiceUnreachable,
189 ProtocolViolation,
190}
191
192#[derive(Debug, Clone, Copy)]
195pub enum ZeroCopyFrame<'a> {
196 Data {
197 stream_id: u32,
198 data: &'a [u8],
199 fin: bool,
200 },
201}
202
203impl ZeroCopyFrame<'_> {
204 #[inline]
206 pub fn to_owned(self) -> Frame {
207 match self {
208 ZeroCopyFrame::Data {
209 stream_id,
210 data,
211 fin,
212 } => Frame::Data {
213 stream_id,
214 data: Bytes::copy_from_slice(data),
215 end_of_stream: fin,
216 },
217 }
218 }
219}
220
221#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
223pub enum ErrorCode {
224 ProtocolError = 1,
225 AuthenticationFailed = 2,
226 SessionNotFound = 3,
227 StreamNotFound = 4,
228 Timeout = 5,
229 InternalServerError = 6,
230 ServiceUnavailable = 7,
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn test_frame_serialization() {
239 let frame = Frame::Handshake(Box::new(HandshakeFrame {
240 token: "test-token".to_string(),
241 tunnel_id: Some("test-tunnel".to_string()),
242 min_version: 1,
243 max_version: 1,
244 capabilities: vec!["http".to_string()],
245 }));
246
247 let config = bincode_next::config::standard();
248 let encoded = bincode_next::serde::encode_to_vec(&frame, config).unwrap();
249 let (decoded, _): (Frame, usize) =
250 bincode_next::serde::decode_from_slice(&encoded, config).unwrap();
251
252 assert_eq!(frame, decoded);
253 }
254
255 #[test]
256 fn test_data_frame_with_bytes() {
257 let data = Bytes::from("hello world");
258 let frame = Frame::Data {
259 stream_id: 42,
260 data: data.clone(),
261 end_of_stream: false,
262 };
263
264 let config = bincode_next::config::standard();
265 let encoded = bincode_next::serde::encode_to_vec(&frame, config).unwrap();
266 let (decoded, _): (Frame, usize) =
267 bincode_next::serde::decode_from_slice(&encoded, config).unwrap();
268
269 if let Frame::Data {
270 data: decoded_data, ..
271 } = decoded
272 {
273 assert_eq!(data, decoded_data);
274 } else {
275 panic!("Expected Data frame");
276 }
277 }
278
279 #[test]
280 fn test_all_frame_types() {
281 let frames = vec![
282 Frame::Heartbeat { timestamp: 123_456 },
283 Frame::HeartbeatAck { timestamp: 123_456 },
284 Frame::CloseStream {
285 stream_id: 1,
286 reason: CloseReason::Normal,
287 },
288 Frame::Error {
289 stream_id: Some(1),
290 code: ErrorCode::ProtocolError,
291 message: "test error".to_string(),
292 },
293 ];
294
295 for frame in frames {
296 let config = bincode_next::config::standard();
297 let encoded = bincode_next::serde::encode_to_vec(&frame, config).unwrap();
298 let (_decoded, _): (Frame, usize) =
299 bincode_next::serde::decode_from_slice(&encoded, config).unwrap();
300 }
301 }
302}