Skip to main content

oxigdal_websocket/protocol/
message.rs

1//! Message types and payloads
2
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7/// Message type enumeration
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
9#[repr(u8)]
10pub enum MessageType {
11    /// Ping message
12    Ping = 0,
13    /// Pong message
14    Pong = 1,
15    /// Subscribe request
16    Subscribe = 2,
17    /// Unsubscribe request
18    Unsubscribe = 3,
19    /// Publish message
20    Publish = 4,
21    /// Data message
22    Data = 5,
23    /// Tile update
24    TileUpdate = 6,
25    /// Feature update
26    FeatureUpdate = 7,
27    /// Change stream event
28    ChangeStream = 8,
29    /// Error message
30    Error = 9,
31    /// Acknowledgement
32    Ack = 10,
33    /// Join room
34    JoinRoom = 11,
35    /// Leave room
36    LeaveRoom = 12,
37    /// Broadcast message
38    Broadcast = 13,
39    /// System event
40    SystemEvent = 14,
41}
42
43/// Message payload
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub enum Payload {
46    /// Empty payload
47    Empty,
48    /// Text payload
49    Text(String),
50    /// Binary payload
51    Binary(Vec<u8>),
52    /// JSON payload
53    Json(serde_json::Value),
54    /// Tile data payload
55    TileData(TilePayload),
56    /// Feature data payload
57    FeatureData(FeaturePayload),
58    /// Change event payload
59    ChangeEvent(ChangePayload),
60    /// Subscribe payload
61    Subscribe(SubscribePayload),
62    /// Room payload
63    Room(RoomPayload),
64    /// Error payload
65    Error(ErrorPayload),
66}
67
68/// WebSocket message
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct Message {
71    /// Message ID
72    pub id: Uuid,
73    /// Message type
74    pub msg_type: MessageType,
75    /// Timestamp (milliseconds since epoch)
76    pub timestamp: i64,
77    /// Payload
78    pub payload: Payload,
79    /// Optional correlation ID (for request/response)
80    pub correlation_id: Option<Uuid>,
81}
82
83impl Message {
84    /// Create a new message
85    pub fn new(msg_type: MessageType, payload: Payload) -> Self {
86        Self {
87            id: Uuid::new_v4(),
88            msg_type,
89            timestamp: chrono::Utc::now().timestamp_millis(),
90            payload,
91            correlation_id: None,
92        }
93    }
94
95    /// Create a ping message
96    pub fn ping() -> Self {
97        Self::new(MessageType::Ping, Payload::Empty)
98    }
99
100    /// Create a pong message
101    pub fn pong() -> Self {
102        Self::new(MessageType::Pong, Payload::Empty)
103    }
104
105    /// Create a subscribe message
106    pub fn subscribe(topic: String, filter: Option<serde_json::Value>) -> Self {
107        Self::new(
108            MessageType::Subscribe,
109            Payload::Subscribe(SubscribePayload { topic, filter }),
110        )
111    }
112
113    /// Create an unsubscribe message
114    pub fn unsubscribe(topic: String) -> Self {
115        Self::new(
116            MessageType::Unsubscribe,
117            Payload::Subscribe(SubscribePayload {
118                topic,
119                filter: None,
120            }),
121        )
122    }
123
124    /// Create a data message
125    pub fn data(data: Bytes) -> Self {
126        Self::new(MessageType::Data, Payload::Binary(data.to_vec()))
127    }
128
129    /// Create an error message
130    pub fn error(code: u32, message: String) -> Self {
131        Self::new(
132            MessageType::Error,
133            Payload::Error(ErrorPayload { code, message }),
134        )
135    }
136
137    /// Create a join room message
138    pub fn join_room(room: String) -> Self {
139        Self::new(MessageType::JoinRoom, Payload::Room(RoomPayload { room }))
140    }
141
142    /// Create a leave room message
143    pub fn leave_room(room: String) -> Self {
144        Self::new(MessageType::LeaveRoom, Payload::Room(RoomPayload { room }))
145    }
146
147    /// Get message type
148    pub fn message_type(&self) -> MessageType {
149        self.msg_type
150    }
151
152    /// Set correlation ID
153    pub fn with_correlation_id(mut self, id: Uuid) -> Self {
154        self.correlation_id = Some(id);
155        self
156    }
157
158    /// Check if this is a response to another message
159    pub fn is_response_to(&self, message_id: &Uuid) -> bool {
160        self.correlation_id.as_ref() == Some(message_id)
161    }
162}
163
164/// Tile payload
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct TilePayload {
167    /// Tile Z coordinate (zoom level)
168    pub z: u8,
169    /// Tile X coordinate
170    pub x: u32,
171    /// Tile Y coordinate
172    pub y: u32,
173    /// Tile data
174    pub data: Vec<u8>,
175    /// Tile format (e.g., "png", "webp", "mvt")
176    pub format: String,
177    /// Optional delta encoding (if incremental update)
178    pub delta: Option<Vec<u8>>,
179}
180
181/// Feature payload
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct FeaturePayload {
184    /// Feature ID
185    pub id: String,
186    /// Layer name
187    pub layer: String,
188    /// GeoJSON feature
189    pub feature: serde_json::Value,
190    /// Change type
191    pub change_type: ChangeType,
192}
193
194/// Change type enumeration
195#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
196pub enum ChangeType {
197    /// Feature created
198    Created,
199    /// Feature updated
200    Updated,
201    /// Feature deleted
202    Deleted,
203}
204
205/// Change event payload
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct ChangePayload {
208    /// Change ID
209    pub change_id: u64,
210    /// Collection/layer name
211    pub collection: String,
212    /// Change type
213    pub change_type: ChangeType,
214    /// Document ID
215    pub document_id: String,
216    /// Optional change data
217    pub data: Option<serde_json::Value>,
218}
219
220/// Subscribe payload
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct SubscribePayload {
223    /// Topic to subscribe to
224    pub topic: String,
225    /// Optional filter
226    pub filter: Option<serde_json::Value>,
227}
228
229/// Room payload
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct RoomPayload {
232    /// Room name
233    pub room: String,
234}
235
236/// Error payload
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ErrorPayload {
239    /// Error code
240    pub code: u32,
241    /// Error message
242    pub message: String,
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn test_message_creation() {
251        let msg = Message::ping();
252        assert_eq!(msg.msg_type, MessageType::Ping);
253        assert!(matches!(msg.payload, Payload::Empty));
254    }
255
256    #[test]
257    fn test_message_correlation() {
258        let request = Message::ping();
259        let response = Message::pong().with_correlation_id(request.id);
260
261        assert!(response.is_response_to(&request.id));
262    }
263
264    #[test]
265    fn test_subscribe_message() {
266        let msg = Message::subscribe("tiles".to_string(), None);
267        assert_eq!(msg.msg_type, MessageType::Subscribe);
268
269        assert!(
270            matches!(msg.payload, Payload::Subscribe(_)),
271            "Expected Subscribe payload"
272        );
273        if let Payload::Subscribe(sub) = &msg.payload {
274            assert_eq!(sub.topic, "tiles");
275        }
276    }
277
278    #[test]
279    fn test_tile_payload() {
280        let payload = TilePayload {
281            z: 10,
282            x: 512,
283            y: 384,
284            data: vec![1, 2, 3, 4],
285            format: "png".to_string(),
286            delta: None,
287        };
288
289        assert_eq!(payload.z, 10);
290        assert_eq!(payload.x, 512);
291        assert_eq!(payload.y, 384);
292    }
293
294    #[test]
295    fn test_feature_payload() {
296        let feature = serde_json::json!({
297            "type": "Feature",
298            "geometry": {
299                "type": "Point",
300                "coordinates": [0.0, 0.0]
301            },
302            "properties": {}
303        });
304
305        let payload = FeaturePayload {
306            id: "feature1".to_string(),
307            layer: "layer1".to_string(),
308            feature,
309            change_type: ChangeType::Created,
310        };
311
312        assert_eq!(payload.id, "feature1");
313        assert_eq!(payload.change_type, ChangeType::Created);
314    }
315}