Skip to main content

oxigdal_websocket/protocol/
mod.rs

1//! WebSocket protocol implementation
2//!
3//! This module provides protocol support for WebSocket communication including:
4//! - Binary and JSON message formats
5//! - Message framing and encoding
6//! - Compression support (gzip, zstd)
7//! - Geospatial-optimized binary protocols
8
9pub mod binary;
10pub mod compression;
11pub mod framing;
12pub mod json;
13pub mod message;
14
15pub use binary::{BinaryCodec, BinaryMessage, GeospatialBinaryProtocol};
16pub use compression::{CompressionCodec, CompressionLevel, CompressionType};
17pub use framing::{Frame, FrameCodec, FrameHeader, FrameType};
18pub use json::{JsonCodec, JsonMessage};
19pub use message::{Message, MessageType, Payload};
20
21use crate::error::{Error, Result};
22use bytes::{Bytes, BytesMut};
23use serde::{Deserialize, Serialize};
24
25/// Protocol version
26pub const PROTOCOL_VERSION: u8 = 1;
27
28/// Maximum message size (16MB)
29pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
30
31/// Message format enumeration
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33pub enum MessageFormat {
34    /// JSON format
35    Json,
36    /// Binary format
37    Binary,
38    /// MessagePack format
39    MessagePack,
40}
41
42/// Protocol configuration
43#[derive(Debug, Clone)]
44pub struct ProtocolConfig {
45    /// Message format
46    pub format: MessageFormat,
47    /// Compression type
48    pub compression: Option<CompressionType>,
49    /// Compression level
50    pub compression_level: CompressionLevel,
51    /// Enable framing
52    pub enable_framing: bool,
53    /// Maximum message size
54    pub max_message_size: usize,
55}
56
57impl Default for ProtocolConfig {
58    fn default() -> Self {
59        Self {
60            format: MessageFormat::Binary,
61            compression: Some(CompressionType::Zstd),
62            compression_level: CompressionLevel::Default,
63            enable_framing: true,
64            max_message_size: MAX_MESSAGE_SIZE,
65        }
66    }
67}
68
69/// Protocol codec for encoding/decoding messages
70pub struct ProtocolCodec {
71    config: ProtocolConfig,
72    compression_codec: Option<CompressionCodec>,
73    frame_codec: FrameCodec,
74}
75
76impl ProtocolCodec {
77    /// Create a new protocol codec
78    pub fn new(config: ProtocolConfig) -> Self {
79        let compression_codec = config
80            .compression
81            .map(|ct| CompressionCodec::new(ct, config.compression_level));
82
83        Self {
84            config,
85            compression_codec,
86            frame_codec: FrameCodec::new(),
87        }
88    }
89
90    /// Encode a message
91    pub fn encode(&self, message: &Message) -> Result<Bytes> {
92        // Serialize message based on format
93        let mut data = match self.config.format {
94            MessageFormat::Json => {
95                let json = serde_json::to_vec(message)?;
96                BytesMut::from(&json[..])
97            }
98            MessageFormat::Binary => BinaryCodec::encode(message)?,
99            MessageFormat::MessagePack => {
100                let msgpack = rmp_serde::to_vec(message)?;
101                BytesMut::from(&msgpack[..])
102            }
103        };
104
105        // Apply compression if enabled
106        if let Some(ref codec) = self.compression_codec {
107            data = codec.compress(&data)?;
108        }
109
110        // Check message size
111        if data.len() > self.config.max_message_size {
112            return Err(Error::Protocol(format!(
113                "Message size {} exceeds maximum {}",
114                data.len(),
115                self.config.max_message_size
116            )));
117        }
118
119        // Apply framing if enabled
120        if self.config.enable_framing {
121            let frame = Frame::new(
122                FrameType::Data,
123                PROTOCOL_VERSION,
124                self.compression_codec.is_some(),
125                data.freeze(),
126            );
127            self.frame_codec.encode(&frame)
128        } else {
129            Ok(data.freeze())
130        }
131    }
132
133    /// Decode a message
134    pub fn decode(&self, data: &[u8]) -> Result<Message> {
135        // Decode framing if enabled
136        let payload = if self.config.enable_framing {
137            let frame = self.frame_codec.decode(data)?;
138            frame.payload
139        } else {
140            Bytes::copy_from_slice(data)
141        };
142
143        // Decompress if needed
144        let decompressed = if let Some(ref codec) = self.compression_codec {
145            codec.decompress(&payload)?
146        } else {
147            payload
148        };
149
150        // Deserialize based on format
151        match self.config.format {
152            MessageFormat::Json => {
153                let message: Message = serde_json::from_slice(&decompressed)?;
154                Ok(message)
155            }
156            MessageFormat::Binary => BinaryCodec::decode(&decompressed),
157            MessageFormat::MessagePack => {
158                let message: Message = rmp_serde::from_slice(&decompressed)?;
159                Ok(message)
160            }
161        }
162    }
163
164    /// Get protocol configuration
165    pub fn config(&self) -> &ProtocolConfig {
166        &self.config
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    #[test]
175    fn test_protocol_codec_json() -> Result<()> {
176        let config = ProtocolConfig {
177            format: MessageFormat::Json,
178            compression: None,
179            enable_framing: false,
180            ..Default::default()
181        };
182
183        let codec = ProtocolCodec::new(config);
184        let message = Message::ping();
185
186        let encoded = codec.encode(&message)?;
187        let decoded = codec.decode(&encoded)?;
188
189        assert_eq!(message.message_type(), decoded.message_type());
190        Ok(())
191    }
192
193    #[test]
194    fn test_protocol_codec_binary() -> Result<()> {
195        let config = ProtocolConfig {
196            format: MessageFormat::Binary,
197            compression: None,
198            enable_framing: false,
199            ..Default::default()
200        };
201
202        let codec = ProtocolCodec::new(config);
203        let message = Message::ping();
204
205        let encoded = codec.encode(&message)?;
206        let decoded = codec.decode(&encoded)?;
207
208        assert_eq!(message.message_type(), decoded.message_type());
209        Ok(())
210    }
211
212    #[test]
213    fn test_protocol_codec_with_compression() -> Result<()> {
214        let config = ProtocolConfig {
215            format: MessageFormat::Binary,
216            compression: Some(CompressionType::Zstd),
217            compression_level: CompressionLevel::Fast,
218            enable_framing: true,
219            ..Default::default()
220        };
221
222        let codec = ProtocolCodec::new(config);
223        let message = Message::ping();
224
225        let encoded = codec.encode(&message)?;
226        let decoded = codec.decode(&encoded)?;
227
228        assert_eq!(message.message_type(), decoded.message_type());
229        Ok(())
230    }
231}