oxigdal_websocket/protocol/
binary.rs1use crate::error::{Error, Result};
4use crate::protocol::message::{Message, MessageType, Payload};
5use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
6use bytes::{BufMut, Bytes, BytesMut};
7use std::io::{Cursor, Read};
8
9pub const BINARY_PROTOCOL_VERSION: u8 = 1;
11
12pub struct BinaryCodec;
14
15impl BinaryCodec {
16 pub fn encode(message: &Message) -> Result<BytesMut> {
18 let mut buf = BytesMut::new();
19
20 buf.put_u8(BINARY_PROTOCOL_VERSION);
22
23 buf.put_u8(message.msg_type as u8);
25
26 buf.put_slice(message.id.as_bytes());
28
29 buf.put_i64(message.timestamp);
31
32 if let Some(corr_id) = message.correlation_id {
34 buf.put_u8(1);
35 buf.put_slice(corr_id.as_bytes());
36 } else {
37 buf.put_u8(0);
38 }
39
40 Self::encode_payload(&message.payload, &mut buf)?;
42
43 Ok(buf)
44 }
45
46 pub fn decode(data: &[u8]) -> Result<Message> {
48 let mut cursor = Cursor::new(data);
49
50 let version = cursor
52 .read_u8()
53 .map_err(|e| Error::Protocol(format!("Failed to read version: {}", e)))?;
54
55 if version != BINARY_PROTOCOL_VERSION {
56 return Err(Error::Protocol(format!(
57 "Unsupported protocol version: {}",
58 version
59 )));
60 }
61
62 let msg_type_u8 = cursor
64 .read_u8()
65 .map_err(|e| Error::Protocol(format!("Failed to read message type: {}", e)))?;
66 let msg_type = Self::decode_message_type(msg_type_u8)?;
67
68 let mut id_bytes = [0u8; 16];
70 cursor
71 .read_exact(&mut id_bytes)
72 .map_err(|e| Error::Protocol(format!("Failed to read message ID: {}", e)))?;
73 let id = uuid::Uuid::from_bytes(id_bytes);
74
75 let timestamp = cursor
77 .read_i64::<BigEndian>()
78 .map_err(|e| Error::Protocol(format!("Failed to read timestamp: {}", e)))?;
79
80 let has_corr_id = cursor
82 .read_u8()
83 .map_err(|e| Error::Protocol(format!("Failed to read correlation flag: {}", e)))?;
84 let correlation_id = if has_corr_id == 1 {
85 let mut corr_id_bytes = [0u8; 16];
86 cursor
87 .read_exact(&mut corr_id_bytes)
88 .map_err(|e| Error::Protocol(format!("Failed to read correlation ID: {}", e)))?;
89 Some(uuid::Uuid::from_bytes(corr_id_bytes))
90 } else {
91 None
92 };
93
94 let payload = Self::decode_payload(&mut cursor)?;
96
97 Ok(Message {
98 id,
99 msg_type,
100 timestamp,
101 payload,
102 correlation_id,
103 })
104 }
105
106 fn encode_payload(payload: &Payload, buf: &mut BytesMut) -> Result<()> {
108 match payload {
109 Payload::Empty => {
110 buf.put_u8(0);
111 }
112 Payload::Text(text) => {
113 buf.put_u8(1);
114 buf.put_u32(text.len() as u32);
115 buf.put_slice(text.as_bytes());
116 }
117 Payload::Binary(data) => {
118 buf.put_u8(2);
119 buf.put_u32(data.len() as u32);
120 buf.put_slice(data);
121 }
122 Payload::Json(value) => {
123 buf.put_u8(3);
124 let json = serde_json::to_vec(value)?;
125 buf.put_u32(json.len() as u32);
126 buf.put_slice(&json);
127 }
128 Payload::TileData(tile) => {
129 buf.put_u8(4);
130 buf.put_u8(tile.z);
132 buf.put_u32(tile.x);
133 buf.put_u32(tile.y);
134 buf.put_u32(tile.format.len() as u32);
135 buf.put_slice(tile.format.as_bytes());
136 buf.put_u32(tile.data.len() as u32);
137 buf.put_slice(&tile.data);
138 if let Some(delta) = &tile.delta {
140 buf.put_u8(1);
141 buf.put_u32(delta.len() as u32);
142 buf.put_slice(delta);
143 } else {
144 buf.put_u8(0);
145 }
146 }
147 Payload::FeatureData(feature) => {
148 buf.put_u8(5);
149 let encoded = rmp_serde::to_vec(feature)?;
151 buf.put_u32(encoded.len() as u32);
152 buf.put_slice(&encoded);
153 }
154 Payload::ChangeEvent(change) => {
155 buf.put_u8(6);
156 let encoded = rmp_serde::to_vec(change)?;
157 buf.put_u32(encoded.len() as u32);
158 buf.put_slice(&encoded);
159 }
160 Payload::Subscribe(sub) => {
161 buf.put_u8(7);
162 let encoded = rmp_serde::to_vec(sub)?;
163 buf.put_u32(encoded.len() as u32);
164 buf.put_slice(&encoded);
165 }
166 Payload::Room(room) => {
167 buf.put_u8(8);
168 buf.put_u32(room.room.len() as u32);
169 buf.put_slice(room.room.as_bytes());
170 }
171 Payload::Error(err) => {
172 buf.put_u8(9);
173 buf.put_u32(err.code);
174 buf.put_u32(err.message.len() as u32);
175 buf.put_slice(err.message.as_bytes());
176 }
177 }
178
179 Ok(())
180 }
181
182 fn decode_payload(cursor: &mut Cursor<&[u8]>) -> Result<Payload> {
184 let payload_type = cursor
185 .read_u8()
186 .map_err(|e| Error::Protocol(format!("Failed to read payload type: {}", e)))?;
187
188 match payload_type {
189 0 => Ok(Payload::Empty),
190 1 => {
191 let len = cursor
193 .read_u32::<BigEndian>()
194 .map_err(|e| Error::Protocol(format!("Failed to read text length: {}", e)))?
195 as usize;
196 let mut text_bytes = vec![0u8; len];
197 cursor
198 .read_exact(&mut text_bytes)
199 .map_err(|e| Error::Protocol(format!("Failed to read text: {}", e)))?;
200 let text = String::from_utf8(text_bytes)
201 .map_err(|e| Error::Protocol(format!("Invalid UTF-8: {}", e)))?;
202 Ok(Payload::Text(text))
203 }
204 2 => {
205 let len = cursor
207 .read_u32::<BigEndian>()
208 .map_err(|e| Error::Protocol(format!("Failed to read binary length: {}", e)))?
209 as usize;
210 let mut data = vec![0u8; len];
211 cursor
212 .read_exact(&mut data)
213 .map_err(|e| Error::Protocol(format!("Failed to read binary: {}", e)))?;
214 Ok(Payload::Binary(data))
215 }
216 3 => {
217 let len = cursor
219 .read_u32::<BigEndian>()
220 .map_err(|e| Error::Protocol(format!("Failed to read JSON length: {}", e)))?
221 as usize;
222 let mut json_bytes = vec![0u8; len];
223 cursor
224 .read_exact(&mut json_bytes)
225 .map_err(|e| Error::Protocol(format!("Failed to read JSON: {}", e)))?;
226 let value: serde_json::Value = serde_json::from_slice(&json_bytes)?;
227 Ok(Payload::Json(value))
228 }
229 4 => {
230 let z = cursor
232 .read_u8()
233 .map_err(|e| Error::Protocol(format!("Failed to read tile z: {}", e)))?;
234 let x = cursor
235 .read_u32::<BigEndian>()
236 .map_err(|e| Error::Protocol(format!("Failed to read tile x: {}", e)))?;
237 let y = cursor
238 .read_u32::<BigEndian>()
239 .map_err(|e| Error::Protocol(format!("Failed to read tile y: {}", e)))?;
240
241 let format_len = cursor
242 .read_u32::<BigEndian>()
243 .map_err(|e| Error::Protocol(format!("Failed to read format length: {}", e)))?
244 as usize;
245 let mut format_bytes = vec![0u8; format_len];
246 cursor
247 .read_exact(&mut format_bytes)
248 .map_err(|e| Error::Protocol(format!("Failed to read format: {}", e)))?;
249 let format = String::from_utf8(format_bytes)
250 .map_err(|e| Error::Protocol(format!("Invalid format UTF-8: {}", e)))?;
251
252 let data_len = cursor
253 .read_u32::<BigEndian>()
254 .map_err(|e| Error::Protocol(format!("Failed to read data length: {}", e)))?
255 as usize;
256 let mut data = vec![0u8; data_len];
257 cursor
258 .read_exact(&mut data)
259 .map_err(|e| Error::Protocol(format!("Failed to read tile data: {}", e)))?;
260
261 let has_delta = cursor
262 .read_u8()
263 .map_err(|e| Error::Protocol(format!("Failed to read delta flag: {}", e)))?;
264 let delta = if has_delta == 1 {
265 let delta_len = cursor.read_u32::<BigEndian>().map_err(|e| {
266 Error::Protocol(format!("Failed to read delta length: {}", e))
267 })? as usize;
268 let mut delta_data = vec![0u8; delta_len];
269 cursor
270 .read_exact(&mut delta_data)
271 .map_err(|e| Error::Protocol(format!("Failed to read delta: {}", e)))?;
272 Some(delta_data)
273 } else {
274 None
275 };
276
277 Ok(Payload::TileData(crate::protocol::message::TilePayload {
278 z,
279 x,
280 y,
281 data,
282 format,
283 delta,
284 }))
285 }
286 5..=7 => {
287 let len = cursor
289 .read_u32::<BigEndian>()
290 .map_err(|e| Error::Protocol(format!("Failed to read length: {}", e)))?
291 as usize;
292 let mut data = vec![0u8; len];
293 cursor
294 .read_exact(&mut data)
295 .map_err(|e| Error::Protocol(format!("Failed to read data: {}", e)))?;
296
297 match payload_type {
298 5 => {
299 let feature = rmp_serde::from_slice(&data)?;
300 Ok(Payload::FeatureData(feature))
301 }
302 6 => {
303 let change = rmp_serde::from_slice(&data)?;
304 Ok(Payload::ChangeEvent(change))
305 }
306 7 => {
307 let sub = rmp_serde::from_slice(&data)?;
308 Ok(Payload::Subscribe(sub))
309 }
310 _ => Err(Error::Protocol("Invalid payload type".to_string())),
311 }
312 }
313 8 => {
314 let len = cursor
316 .read_u32::<BigEndian>()
317 .map_err(|e| Error::Protocol(format!("Failed to read room length: {}", e)))?
318 as usize;
319 let mut room_bytes = vec![0u8; len];
320 cursor
321 .read_exact(&mut room_bytes)
322 .map_err(|e| Error::Protocol(format!("Failed to read room: {}", e)))?;
323 let room = String::from_utf8(room_bytes)
324 .map_err(|e| Error::Protocol(format!("Invalid room UTF-8: {}", e)))?;
325 Ok(Payload::Room(crate::protocol::message::RoomPayload {
326 room,
327 }))
328 }
329 9 => {
330 let code = cursor
332 .read_u32::<BigEndian>()
333 .map_err(|e| Error::Protocol(format!("Failed to read error code: {}", e)))?;
334 let len = cursor.read_u32::<BigEndian>().map_err(|e| {
335 Error::Protocol(format!("Failed to read error message length: {}", e))
336 })? as usize;
337 let mut msg_bytes = vec![0u8; len];
338 cursor
339 .read_exact(&mut msg_bytes)
340 .map_err(|e| Error::Protocol(format!("Failed to read error message: {}", e)))?;
341 let message = String::from_utf8(msg_bytes)
342 .map_err(|e| Error::Protocol(format!("Invalid error message UTF-8: {}", e)))?;
343 Ok(Payload::Error(crate::protocol::message::ErrorPayload {
344 code,
345 message,
346 }))
347 }
348 _ => Err(Error::Protocol(format!(
349 "Unknown payload type: {}",
350 payload_type
351 ))),
352 }
353 }
354
355 fn decode_message_type(value: u8) -> Result<MessageType> {
357 match value {
358 0 => Ok(MessageType::Ping),
359 1 => Ok(MessageType::Pong),
360 2 => Ok(MessageType::Subscribe),
361 3 => Ok(MessageType::Unsubscribe),
362 4 => Ok(MessageType::Publish),
363 5 => Ok(MessageType::Data),
364 6 => Ok(MessageType::TileUpdate),
365 7 => Ok(MessageType::FeatureUpdate),
366 8 => Ok(MessageType::ChangeStream),
367 9 => Ok(MessageType::Error),
368 10 => Ok(MessageType::Ack),
369 11 => Ok(MessageType::JoinRoom),
370 12 => Ok(MessageType::LeaveRoom),
371 13 => Ok(MessageType::Broadcast),
372 14 => Ok(MessageType::SystemEvent),
373 _ => Err(Error::Protocol(format!("Invalid message type: {}", value))),
374 }
375 }
376}
377
378pub struct GeospatialBinaryProtocol;
380
381impl GeospatialBinaryProtocol {
382 pub fn encode_coordinates(coords: &[f64]) -> Vec<u8> {
384 let mut buf = Vec::with_capacity(coords.len() * 8);
385 for &coord in coords {
386 buf.write_f64::<BigEndian>(coord).ok();
387 }
388 buf
389 }
390
391 pub fn decode_coordinates(data: &[u8]) -> Result<Vec<f64>> {
393 let mut cursor = Cursor::new(data);
394 let mut coords = Vec::new();
395
396 while cursor.position() < data.len() as u64 {
397 let coord = cursor
398 .read_f64::<BigEndian>()
399 .map_err(|e| Error::Protocol(format!("Failed to read coordinate: {}", e)))?;
400 coords.push(coord);
401 }
402
403 Ok(coords)
404 }
405
406 pub fn encode_tile_coords(z: u8, x: u32, y: u32) -> [u8; 9] {
408 let mut buf = [0u8; 9];
409 buf[0] = z;
410 buf[1..5].copy_from_slice(&x.to_be_bytes());
411 buf[5..9].copy_from_slice(&y.to_be_bytes());
412 buf
413 }
414
415 pub fn decode_tile_coords(data: &[u8; 9]) -> (u8, u32, u32) {
417 let z = data[0];
418 let x = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
419 let y = u32::from_be_bytes([data[5], data[6], data[7], data[8]]);
420 (z, x, y)
421 }
422}
423
424pub struct BinaryMessage {
426 data: Bytes,
427}
428
429impl BinaryMessage {
430 pub fn new(data: Bytes) -> Self {
432 Self { data }
433 }
434
435 pub fn data(&self) -> &Bytes {
437 &self.data
438 }
439
440 pub fn to_message(&self) -> Result<Message> {
442 BinaryCodec::decode(&self.data)
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn test_binary_codec_ping() -> Result<()> {
452 let msg = Message::ping();
453 let encoded = BinaryCodec::encode(&msg)?;
454 let decoded = BinaryCodec::decode(&encoded)?;
455
456 assert_eq!(msg.msg_type, decoded.msg_type);
457 assert_eq!(msg.id, decoded.id);
458 Ok(())
459 }
460
461 #[test]
462 fn test_binary_codec_text() -> Result<()> {
463 let msg = Message::new(MessageType::Data, Payload::Text("Hello".to_string()));
464 let encoded = BinaryCodec::encode(&msg)?;
465 let decoded = BinaryCodec::decode(&encoded)?;
466
467 assert_eq!(msg.msg_type, decoded.msg_type);
468 assert!(
469 matches!(decoded.payload, Payload::Text(_)),
470 "Expected text payload"
471 );
472 if let Payload::Text(text) = &decoded.payload {
473 assert_eq!(text, "Hello");
474 }
475 Ok(())
476 }
477
478 #[test]
479 fn test_geospatial_coordinates() -> Result<()> {
480 let coords = vec![1.0, 2.0, 3.0, 4.0];
481 let encoded = GeospatialBinaryProtocol::encode_coordinates(&coords);
482 let decoded = GeospatialBinaryProtocol::decode_coordinates(&encoded)?;
483
484 assert_eq!(coords, decoded);
485 Ok(())
486 }
487
488 #[test]
489 fn test_tile_coords() {
490 let (z, x, y) = (10, 512, 384);
491 let encoded = GeospatialBinaryProtocol::encode_tile_coords(z, x, y);
492 let decoded = GeospatialBinaryProtocol::decode_tile_coords(&encoded);
493
494 assert_eq!((z, x, y), decoded);
495 }
496}