oxigdal_websocket/protocol/
message.rs1use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
9#[repr(u8)]
10pub enum MessageType {
11 Ping = 0,
13 Pong = 1,
15 Subscribe = 2,
17 Unsubscribe = 3,
19 Publish = 4,
21 Data = 5,
23 TileUpdate = 6,
25 FeatureUpdate = 7,
27 ChangeStream = 8,
29 Error = 9,
31 Ack = 10,
33 JoinRoom = 11,
35 LeaveRoom = 12,
37 Broadcast = 13,
39 SystemEvent = 14,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub enum Payload {
46 Empty,
48 Text(String),
50 Binary(Vec<u8>),
52 Json(serde_json::Value),
54 TileData(TilePayload),
56 FeatureData(FeaturePayload),
58 ChangeEvent(ChangePayload),
60 Subscribe(SubscribePayload),
62 Room(RoomPayload),
64 Error(ErrorPayload),
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct Message {
71 pub id: Uuid,
73 pub msg_type: MessageType,
75 pub timestamp: i64,
77 pub payload: Payload,
79 pub correlation_id: Option<Uuid>,
81}
82
83impl Message {
84 pub fn new(msg_type: MessageType, payload: Payload) -> Self {
86 Self {
87 id: Uuid::new_v4(),
88 msg_type,
89 timestamp: chrono::Utc::now().timestamp_millis(),
90 payload,
91 correlation_id: None,
92 }
93 }
94
95 pub fn ping() -> Self {
97 Self::new(MessageType::Ping, Payload::Empty)
98 }
99
100 pub fn pong() -> Self {
102 Self::new(MessageType::Pong, Payload::Empty)
103 }
104
105 pub fn subscribe(topic: String, filter: Option<serde_json::Value>) -> Self {
107 Self::new(
108 MessageType::Subscribe,
109 Payload::Subscribe(SubscribePayload { topic, filter }),
110 )
111 }
112
113 pub fn unsubscribe(topic: String) -> Self {
115 Self::new(
116 MessageType::Unsubscribe,
117 Payload::Subscribe(SubscribePayload {
118 topic,
119 filter: None,
120 }),
121 )
122 }
123
124 pub fn data(data: Bytes) -> Self {
126 Self::new(MessageType::Data, Payload::Binary(data.to_vec()))
127 }
128
129 pub fn error(code: u32, message: String) -> Self {
131 Self::new(
132 MessageType::Error,
133 Payload::Error(ErrorPayload { code, message }),
134 )
135 }
136
137 pub fn join_room(room: String) -> Self {
139 Self::new(MessageType::JoinRoom, Payload::Room(RoomPayload { room }))
140 }
141
142 pub fn leave_room(room: String) -> Self {
144 Self::new(MessageType::LeaveRoom, Payload::Room(RoomPayload { room }))
145 }
146
147 pub fn message_type(&self) -> MessageType {
149 self.msg_type
150 }
151
152 pub fn with_correlation_id(mut self, id: Uuid) -> Self {
154 self.correlation_id = Some(id);
155 self
156 }
157
158 pub fn is_response_to(&self, message_id: &Uuid) -> bool {
160 self.correlation_id.as_ref() == Some(message_id)
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct TilePayload {
167 pub z: u8,
169 pub x: u32,
171 pub y: u32,
173 pub data: Vec<u8>,
175 pub format: String,
177 pub delta: Option<Vec<u8>>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct FeaturePayload {
184 pub id: String,
186 pub layer: String,
188 pub feature: serde_json::Value,
190 pub change_type: ChangeType,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
196pub enum ChangeType {
197 Created,
199 Updated,
201 Deleted,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct ChangePayload {
208 pub change_id: u64,
210 pub collection: String,
212 pub change_type: ChangeType,
214 pub document_id: String,
216 pub data: Option<serde_json::Value>,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct SubscribePayload {
223 pub topic: String,
225 pub filter: Option<serde_json::Value>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct RoomPayload {
232 pub room: String,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct ErrorPayload {
239 pub code: u32,
241 pub message: String,
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_message_creation() {
251 let msg = Message::ping();
252 assert_eq!(msg.msg_type, MessageType::Ping);
253 assert!(matches!(msg.payload, Payload::Empty));
254 }
255
256 #[test]
257 fn test_message_correlation() {
258 let request = Message::ping();
259 let response = Message::pong().with_correlation_id(request.id);
260
261 assert!(response.is_response_to(&request.id));
262 }
263
264 #[test]
265 fn test_subscribe_message() {
266 let msg = Message::subscribe("tiles".to_string(), None);
267 assert_eq!(msg.msg_type, MessageType::Subscribe);
268
269 assert!(
270 matches!(msg.payload, Payload::Subscribe(_)),
271 "Expected Subscribe payload"
272 );
273 if let Payload::Subscribe(sub) = &msg.payload {
274 assert_eq!(sub.topic, "tiles");
275 }
276 }
277
278 #[test]
279 fn test_tile_payload() {
280 let payload = TilePayload {
281 z: 10,
282 x: 512,
283 y: 384,
284 data: vec![1, 2, 3, 4],
285 format: "png".to_string(),
286 delta: None,
287 };
288
289 assert_eq!(payload.z, 10);
290 assert_eq!(payload.x, 512);
291 assert_eq!(payload.y, 384);
292 }
293
294 #[test]
295 fn test_feature_payload() {
296 let feature = serde_json::json!({
297 "type": "Feature",
298 "geometry": {
299 "type": "Point",
300 "coordinates": [0.0, 0.0]
301 },
302 "properties": {}
303 });
304
305 let payload = FeaturePayload {
306 id: "feature1".to_string(),
307 layer: "layer1".to_string(),
308 feature,
309 change_type: ChangeType::Created,
310 };
311
312 assert_eq!(payload.id, "feature1");
313 assert_eq!(payload.change_type, ChangeType::Created);
314 }
315}