Skip to main content

oxigdal_websocket/protocol/
binary.rs

1//! Binary protocol implementation for geospatial data
2
3use crate::error::{Error, Result};
4use crate::protocol::message::{Message, MessageType, Payload};
5use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
6use bytes::{BufMut, Bytes, BytesMut};
7use std::io::{Cursor, Read};
8
9/// Binary protocol version
10pub const BINARY_PROTOCOL_VERSION: u8 = 1;
11
12/// Binary message codec
13pub struct BinaryCodec;
14
15impl BinaryCodec {
16    /// Encode a message to binary format
17    pub fn encode(message: &Message) -> Result<BytesMut> {
18        let mut buf = BytesMut::new();
19
20        // Write version
21        buf.put_u8(BINARY_PROTOCOL_VERSION);
22
23        // Write message type
24        buf.put_u8(message.msg_type as u8);
25
26        // Write message ID
27        buf.put_slice(message.id.as_bytes());
28
29        // Write timestamp
30        buf.put_i64(message.timestamp);
31
32        // Write correlation ID flag and value
33        if let Some(corr_id) = message.correlation_id {
34            buf.put_u8(1);
35            buf.put_slice(corr_id.as_bytes());
36        } else {
37            buf.put_u8(0);
38        }
39
40        // Encode payload
41        Self::encode_payload(&message.payload, &mut buf)?;
42
43        Ok(buf)
44    }
45
46    /// Decode a message from binary format
47    pub fn decode(data: &[u8]) -> Result<Message> {
48        let mut cursor = Cursor::new(data);
49
50        // Read version
51        let version = cursor
52            .read_u8()
53            .map_err(|e| Error::Protocol(format!("Failed to read version: {}", e)))?;
54
55        if version != BINARY_PROTOCOL_VERSION {
56            return Err(Error::Protocol(format!(
57                "Unsupported protocol version: {}",
58                version
59            )));
60        }
61
62        // Read message type
63        let msg_type_u8 = cursor
64            .read_u8()
65            .map_err(|e| Error::Protocol(format!("Failed to read message type: {}", e)))?;
66        let msg_type = Self::decode_message_type(msg_type_u8)?;
67
68        // Read message ID
69        let mut id_bytes = [0u8; 16];
70        cursor
71            .read_exact(&mut id_bytes)
72            .map_err(|e| Error::Protocol(format!("Failed to read message ID: {}", e)))?;
73        let id = uuid::Uuid::from_bytes(id_bytes);
74
75        // Read timestamp
76        let timestamp = cursor
77            .read_i64::<BigEndian>()
78            .map_err(|e| Error::Protocol(format!("Failed to read timestamp: {}", e)))?;
79
80        // Read correlation ID
81        let has_corr_id = cursor
82            .read_u8()
83            .map_err(|e| Error::Protocol(format!("Failed to read correlation flag: {}", e)))?;
84        let correlation_id = if has_corr_id == 1 {
85            let mut corr_id_bytes = [0u8; 16];
86            cursor
87                .read_exact(&mut corr_id_bytes)
88                .map_err(|e| Error::Protocol(format!("Failed to read correlation ID: {}", e)))?;
89            Some(uuid::Uuid::from_bytes(corr_id_bytes))
90        } else {
91            None
92        };
93
94        // Decode payload
95        let payload = Self::decode_payload(&mut cursor)?;
96
97        Ok(Message {
98            id,
99            msg_type,
100            timestamp,
101            payload,
102            correlation_id,
103        })
104    }
105
106    /// Encode payload
107    fn encode_payload(payload: &Payload, buf: &mut BytesMut) -> Result<()> {
108        match payload {
109            Payload::Empty => {
110                buf.put_u8(0);
111            }
112            Payload::Text(text) => {
113                buf.put_u8(1);
114                buf.put_u32(text.len() as u32);
115                buf.put_slice(text.as_bytes());
116            }
117            Payload::Binary(data) => {
118                buf.put_u8(2);
119                buf.put_u32(data.len() as u32);
120                buf.put_slice(data);
121            }
122            Payload::Json(value) => {
123                buf.put_u8(3);
124                let json = serde_json::to_vec(value)?;
125                buf.put_u32(json.len() as u32);
126                buf.put_slice(&json);
127            }
128            Payload::TileData(tile) => {
129                buf.put_u8(4);
130                // Encode tile data
131                buf.put_u8(tile.z);
132                buf.put_u32(tile.x);
133                buf.put_u32(tile.y);
134                buf.put_u32(tile.format.len() as u32);
135                buf.put_slice(tile.format.as_bytes());
136                buf.put_u32(tile.data.len() as u32);
137                buf.put_slice(&tile.data);
138                // Encode delta flag and data
139                if let Some(delta) = &tile.delta {
140                    buf.put_u8(1);
141                    buf.put_u32(delta.len() as u32);
142                    buf.put_slice(delta);
143                } else {
144                    buf.put_u8(0);
145                }
146            }
147            Payload::FeatureData(feature) => {
148                buf.put_u8(5);
149                // Use MessagePack for complex nested structures
150                let encoded = rmp_serde::to_vec(feature)?;
151                buf.put_u32(encoded.len() as u32);
152                buf.put_slice(&encoded);
153            }
154            Payload::ChangeEvent(change) => {
155                buf.put_u8(6);
156                let encoded = rmp_serde::to_vec(change)?;
157                buf.put_u32(encoded.len() as u32);
158                buf.put_slice(&encoded);
159            }
160            Payload::Subscribe(sub) => {
161                buf.put_u8(7);
162                let encoded = rmp_serde::to_vec(sub)?;
163                buf.put_u32(encoded.len() as u32);
164                buf.put_slice(&encoded);
165            }
166            Payload::Room(room) => {
167                buf.put_u8(8);
168                buf.put_u32(room.room.len() as u32);
169                buf.put_slice(room.room.as_bytes());
170            }
171            Payload::Error(err) => {
172                buf.put_u8(9);
173                buf.put_u32(err.code);
174                buf.put_u32(err.message.len() as u32);
175                buf.put_slice(err.message.as_bytes());
176            }
177        }
178
179        Ok(())
180    }
181
182    /// Decode payload
183    fn decode_payload(cursor: &mut Cursor<&[u8]>) -> Result<Payload> {
184        let payload_type = cursor
185            .read_u8()
186            .map_err(|e| Error::Protocol(format!("Failed to read payload type: {}", e)))?;
187
188        match payload_type {
189            0 => Ok(Payload::Empty),
190            1 => {
191                // Text
192                let len = cursor
193                    .read_u32::<BigEndian>()
194                    .map_err(|e| Error::Protocol(format!("Failed to read text length: {}", e)))?
195                    as usize;
196                let mut text_bytes = vec![0u8; len];
197                cursor
198                    .read_exact(&mut text_bytes)
199                    .map_err(|e| Error::Protocol(format!("Failed to read text: {}", e)))?;
200                let text = String::from_utf8(text_bytes)
201                    .map_err(|e| Error::Protocol(format!("Invalid UTF-8: {}", e)))?;
202                Ok(Payload::Text(text))
203            }
204            2 => {
205                // Binary
206                let len = cursor
207                    .read_u32::<BigEndian>()
208                    .map_err(|e| Error::Protocol(format!("Failed to read binary length: {}", e)))?
209                    as usize;
210                let mut data = vec![0u8; len];
211                cursor
212                    .read_exact(&mut data)
213                    .map_err(|e| Error::Protocol(format!("Failed to read binary: {}", e)))?;
214                Ok(Payload::Binary(data))
215            }
216            3 => {
217                // JSON
218                let len = cursor
219                    .read_u32::<BigEndian>()
220                    .map_err(|e| Error::Protocol(format!("Failed to read JSON length: {}", e)))?
221                    as usize;
222                let mut json_bytes = vec![0u8; len];
223                cursor
224                    .read_exact(&mut json_bytes)
225                    .map_err(|e| Error::Protocol(format!("Failed to read JSON: {}", e)))?;
226                let value: serde_json::Value = serde_json::from_slice(&json_bytes)?;
227                Ok(Payload::Json(value))
228            }
229            4 => {
230                // TileData
231                let z = cursor
232                    .read_u8()
233                    .map_err(|e| Error::Protocol(format!("Failed to read tile z: {}", e)))?;
234                let x = cursor
235                    .read_u32::<BigEndian>()
236                    .map_err(|e| Error::Protocol(format!("Failed to read tile x: {}", e)))?;
237                let y = cursor
238                    .read_u32::<BigEndian>()
239                    .map_err(|e| Error::Protocol(format!("Failed to read tile y: {}", e)))?;
240
241                let format_len = cursor
242                    .read_u32::<BigEndian>()
243                    .map_err(|e| Error::Protocol(format!("Failed to read format length: {}", e)))?
244                    as usize;
245                let mut format_bytes = vec![0u8; format_len];
246                cursor
247                    .read_exact(&mut format_bytes)
248                    .map_err(|e| Error::Protocol(format!("Failed to read format: {}", e)))?;
249                let format = String::from_utf8(format_bytes)
250                    .map_err(|e| Error::Protocol(format!("Invalid format UTF-8: {}", e)))?;
251
252                let data_len = cursor
253                    .read_u32::<BigEndian>()
254                    .map_err(|e| Error::Protocol(format!("Failed to read data length: {}", e)))?
255                    as usize;
256                let mut data = vec![0u8; data_len];
257                cursor
258                    .read_exact(&mut data)
259                    .map_err(|e| Error::Protocol(format!("Failed to read tile data: {}", e)))?;
260
261                let has_delta = cursor
262                    .read_u8()
263                    .map_err(|e| Error::Protocol(format!("Failed to read delta flag: {}", e)))?;
264                let delta = if has_delta == 1 {
265                    let delta_len = cursor.read_u32::<BigEndian>().map_err(|e| {
266                        Error::Protocol(format!("Failed to read delta length: {}", e))
267                    })? as usize;
268                    let mut delta_data = vec![0u8; delta_len];
269                    cursor
270                        .read_exact(&mut delta_data)
271                        .map_err(|e| Error::Protocol(format!("Failed to read delta: {}", e)))?;
272                    Some(delta_data)
273                } else {
274                    None
275                };
276
277                Ok(Payload::TileData(crate::protocol::message::TilePayload {
278                    z,
279                    x,
280                    y,
281                    data,
282                    format,
283                    delta,
284                }))
285            }
286            5..=7 => {
287                // FeatureData, ChangeEvent, Subscribe (MessagePack encoded)
288                let len = cursor
289                    .read_u32::<BigEndian>()
290                    .map_err(|e| Error::Protocol(format!("Failed to read length: {}", e)))?
291                    as usize;
292                let mut data = vec![0u8; len];
293                cursor
294                    .read_exact(&mut data)
295                    .map_err(|e| Error::Protocol(format!("Failed to read data: {}", e)))?;
296
297                match payload_type {
298                    5 => {
299                        let feature = rmp_serde::from_slice(&data)?;
300                        Ok(Payload::FeatureData(feature))
301                    }
302                    6 => {
303                        let change = rmp_serde::from_slice(&data)?;
304                        Ok(Payload::ChangeEvent(change))
305                    }
306                    7 => {
307                        let sub = rmp_serde::from_slice(&data)?;
308                        Ok(Payload::Subscribe(sub))
309                    }
310                    _ => Err(Error::Protocol("Invalid payload type".to_string())),
311                }
312            }
313            8 => {
314                // Room
315                let len = cursor
316                    .read_u32::<BigEndian>()
317                    .map_err(|e| Error::Protocol(format!("Failed to read room length: {}", e)))?
318                    as usize;
319                let mut room_bytes = vec![0u8; len];
320                cursor
321                    .read_exact(&mut room_bytes)
322                    .map_err(|e| Error::Protocol(format!("Failed to read room: {}", e)))?;
323                let room = String::from_utf8(room_bytes)
324                    .map_err(|e| Error::Protocol(format!("Invalid room UTF-8: {}", e)))?;
325                Ok(Payload::Room(crate::protocol::message::RoomPayload {
326                    room,
327                }))
328            }
329            9 => {
330                // Error
331                let code = cursor
332                    .read_u32::<BigEndian>()
333                    .map_err(|e| Error::Protocol(format!("Failed to read error code: {}", e)))?;
334                let len = cursor.read_u32::<BigEndian>().map_err(|e| {
335                    Error::Protocol(format!("Failed to read error message length: {}", e))
336                })? as usize;
337                let mut msg_bytes = vec![0u8; len];
338                cursor
339                    .read_exact(&mut msg_bytes)
340                    .map_err(|e| Error::Protocol(format!("Failed to read error message: {}", e)))?;
341                let message = String::from_utf8(msg_bytes)
342                    .map_err(|e| Error::Protocol(format!("Invalid error message UTF-8: {}", e)))?;
343                Ok(Payload::Error(crate::protocol::message::ErrorPayload {
344                    code,
345                    message,
346                }))
347            }
348            _ => Err(Error::Protocol(format!(
349                "Unknown payload type: {}",
350                payload_type
351            ))),
352        }
353    }
354
355    /// Decode message type
356    fn decode_message_type(value: u8) -> Result<MessageType> {
357        match value {
358            0 => Ok(MessageType::Ping),
359            1 => Ok(MessageType::Pong),
360            2 => Ok(MessageType::Subscribe),
361            3 => Ok(MessageType::Unsubscribe),
362            4 => Ok(MessageType::Publish),
363            5 => Ok(MessageType::Data),
364            6 => Ok(MessageType::TileUpdate),
365            7 => Ok(MessageType::FeatureUpdate),
366            8 => Ok(MessageType::ChangeStream),
367            9 => Ok(MessageType::Error),
368            10 => Ok(MessageType::Ack),
369            11 => Ok(MessageType::JoinRoom),
370            12 => Ok(MessageType::LeaveRoom),
371            13 => Ok(MessageType::Broadcast),
372            14 => Ok(MessageType::SystemEvent),
373            _ => Err(Error::Protocol(format!("Invalid message type: {}", value))),
374        }
375    }
376}
377
378/// Geospatial binary protocol optimizations
379pub struct GeospatialBinaryProtocol;
380
381impl GeospatialBinaryProtocol {
382    /// Encode coordinates with variable-length encoding
383    pub fn encode_coordinates(coords: &[f64]) -> Vec<u8> {
384        let mut buf = Vec::with_capacity(coords.len() * 8);
385        for &coord in coords {
386            buf.write_f64::<BigEndian>(coord).ok();
387        }
388        buf
389    }
390
391    /// Decode coordinates
392    pub fn decode_coordinates(data: &[u8]) -> Result<Vec<f64>> {
393        let mut cursor = Cursor::new(data);
394        let mut coords = Vec::new();
395
396        while cursor.position() < data.len() as u64 {
397            let coord = cursor
398                .read_f64::<BigEndian>()
399                .map_err(|e| Error::Protocol(format!("Failed to read coordinate: {}", e)))?;
400            coords.push(coord);
401        }
402
403        Ok(coords)
404    }
405
406    /// Encode tile coordinates (z, x, y) efficiently
407    pub fn encode_tile_coords(z: u8, x: u32, y: u32) -> [u8; 9] {
408        let mut buf = [0u8; 9];
409        buf[0] = z;
410        buf[1..5].copy_from_slice(&x.to_be_bytes());
411        buf[5..9].copy_from_slice(&y.to_be_bytes());
412        buf
413    }
414
415    /// Decode tile coordinates
416    pub fn decode_tile_coords(data: &[u8; 9]) -> (u8, u32, u32) {
417        let z = data[0];
418        let x = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
419        let y = u32::from_be_bytes([data[5], data[6], data[7], data[8]]);
420        (z, x, y)
421    }
422}
423
424/// Binary message wrapper
425pub struct BinaryMessage {
426    data: Bytes,
427}
428
429impl BinaryMessage {
430    /// Create a new binary message
431    pub fn new(data: Bytes) -> Self {
432        Self { data }
433    }
434
435    /// Get message data
436    pub fn data(&self) -> &Bytes {
437        &self.data
438    }
439
440    /// Convert to message
441    pub fn to_message(&self) -> Result<Message> {
442        BinaryCodec::decode(&self.data)
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449
450    #[test]
451    fn test_binary_codec_ping() -> Result<()> {
452        let msg = Message::ping();
453        let encoded = BinaryCodec::encode(&msg)?;
454        let decoded = BinaryCodec::decode(&encoded)?;
455
456        assert_eq!(msg.msg_type, decoded.msg_type);
457        assert_eq!(msg.id, decoded.id);
458        Ok(())
459    }
460
461    #[test]
462    fn test_binary_codec_text() -> Result<()> {
463        let msg = Message::new(MessageType::Data, Payload::Text("Hello".to_string()));
464        let encoded = BinaryCodec::encode(&msg)?;
465        let decoded = BinaryCodec::decode(&encoded)?;
466
467        assert_eq!(msg.msg_type, decoded.msg_type);
468        assert!(
469            matches!(decoded.payload, Payload::Text(_)),
470            "Expected text payload"
471        );
472        if let Payload::Text(text) = &decoded.payload {
473            assert_eq!(text, "Hello");
474        }
475        Ok(())
476    }
477
478    #[test]
479    fn test_geospatial_coordinates() -> Result<()> {
480        let coords = vec![1.0, 2.0, 3.0, 4.0];
481        let encoded = GeospatialBinaryProtocol::encode_coordinates(&coords);
482        let decoded = GeospatialBinaryProtocol::decode_coordinates(&encoded)?;
483
484        assert_eq!(coords, decoded);
485        Ok(())
486    }
487
488    #[test]
489    fn test_tile_coords() {
490        let (z, x, y) = (10, 512, 384);
491        let encoded = GeospatialBinaryProtocol::encode_tile_coords(z, x, y);
492        let decoded = GeospatialBinaryProtocol::decode_tile_coords(&encoded);
493
494        assert_eq!((z, x, y), decoded);
495    }
496}