oxigdal_websocket/protocol/
mod.rs1pub mod binary;
10pub mod compression;
11pub mod framing;
12pub mod json;
13pub mod message;
14
15pub use binary::{BinaryCodec, BinaryMessage, GeospatialBinaryProtocol};
16pub use compression::{CompressionCodec, CompressionLevel, CompressionType};
17pub use framing::{Frame, FrameCodec, FrameHeader, FrameType};
18pub use json::{JsonCodec, JsonMessage};
19pub use message::{Message, MessageType, Payload};
20
21use crate::error::{Error, Result};
22use bytes::{Bytes, BytesMut};
23use serde::{Deserialize, Serialize};
24
25pub const PROTOCOL_VERSION: u8 = 1;
27
28pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33pub enum MessageFormat {
34 Json,
36 Binary,
38 MessagePack,
40}
41
42#[derive(Debug, Clone)]
44pub struct ProtocolConfig {
45 pub format: MessageFormat,
47 pub compression: Option<CompressionType>,
49 pub compression_level: CompressionLevel,
51 pub enable_framing: bool,
53 pub max_message_size: usize,
55}
56
57impl Default for ProtocolConfig {
58 fn default() -> Self {
59 Self {
60 format: MessageFormat::Binary,
61 compression: Some(CompressionType::Zstd),
62 compression_level: CompressionLevel::Default,
63 enable_framing: true,
64 max_message_size: MAX_MESSAGE_SIZE,
65 }
66 }
67}
68
69pub struct ProtocolCodec {
71 config: ProtocolConfig,
72 compression_codec: Option<CompressionCodec>,
73 frame_codec: FrameCodec,
74}
75
76impl ProtocolCodec {
77 pub fn new(config: ProtocolConfig) -> Self {
79 let compression_codec = config
80 .compression
81 .map(|ct| CompressionCodec::new(ct, config.compression_level));
82
83 Self {
84 config,
85 compression_codec,
86 frame_codec: FrameCodec::new(),
87 }
88 }
89
90 pub fn encode(&self, message: &Message) -> Result<Bytes> {
92 let mut data = match self.config.format {
94 MessageFormat::Json => {
95 let json = serde_json::to_vec(message)?;
96 BytesMut::from(&json[..])
97 }
98 MessageFormat::Binary => BinaryCodec::encode(message)?,
99 MessageFormat::MessagePack => {
100 let msgpack = rmp_serde::to_vec(message)?;
101 BytesMut::from(&msgpack[..])
102 }
103 };
104
105 if let Some(ref codec) = self.compression_codec {
107 data = codec.compress(&data)?;
108 }
109
110 if data.len() > self.config.max_message_size {
112 return Err(Error::Protocol(format!(
113 "Message size {} exceeds maximum {}",
114 data.len(),
115 self.config.max_message_size
116 )));
117 }
118
119 if self.config.enable_framing {
121 let frame = Frame::new(
122 FrameType::Data,
123 PROTOCOL_VERSION,
124 self.compression_codec.is_some(),
125 data.freeze(),
126 );
127 self.frame_codec.encode(&frame)
128 } else {
129 Ok(data.freeze())
130 }
131 }
132
133 pub fn decode(&self, data: &[u8]) -> Result<Message> {
135 let payload = if self.config.enable_framing {
137 let frame = self.frame_codec.decode(data)?;
138 frame.payload
139 } else {
140 Bytes::copy_from_slice(data)
141 };
142
143 let decompressed = if let Some(ref codec) = self.compression_codec {
145 codec.decompress(&payload)?
146 } else {
147 payload
148 };
149
150 match self.config.format {
152 MessageFormat::Json => {
153 let message: Message = serde_json::from_slice(&decompressed)?;
154 Ok(message)
155 }
156 MessageFormat::Binary => BinaryCodec::decode(&decompressed),
157 MessageFormat::MessagePack => {
158 let message: Message = rmp_serde::from_slice(&decompressed)?;
159 Ok(message)
160 }
161 }
162 }
163
164 pub fn config(&self) -> &ProtocolConfig {
166 &self.config
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 #[test]
175 fn test_protocol_codec_json() -> Result<()> {
176 let config = ProtocolConfig {
177 format: MessageFormat::Json,
178 compression: None,
179 enable_framing: false,
180 ..Default::default()
181 };
182
183 let codec = ProtocolCodec::new(config);
184 let message = Message::ping();
185
186 let encoded = codec.encode(&message)?;
187 let decoded = codec.decode(&encoded)?;
188
189 assert_eq!(message.message_type(), decoded.message_type());
190 Ok(())
191 }
192
193 #[test]
194 fn test_protocol_codec_binary() -> Result<()> {
195 let config = ProtocolConfig {
196 format: MessageFormat::Binary,
197 compression: None,
198 enable_framing: false,
199 ..Default::default()
200 };
201
202 let codec = ProtocolCodec::new(config);
203 let message = Message::ping();
204
205 let encoded = codec.encode(&message)?;
206 let decoded = codec.decode(&encoded)?;
207
208 assert_eq!(message.message_type(), decoded.message_type());
209 Ok(())
210 }
211
212 #[test]
213 fn test_protocol_codec_with_compression() -> Result<()> {
214 let config = ProtocolConfig {
215 format: MessageFormat::Binary,
216 compression: Some(CompressionType::Zstd),
217 compression_level: CompressionLevel::Fast,
218 enable_framing: true,
219 ..Default::default()
220 };
221
222 let codec = ProtocolCodec::new(config);
223 let message = Message::ping();
224
225 let encoded = codec.encode(&message)?;
226 let decoded = codec.decode(&encoded)?;
227
228 assert_eq!(message.message_type(), decoded.message_type());
229 Ok(())
230 }
231}