1use std::{collections::HashMap, mem::size_of, sync::Arc};
2
3use bytes::{Buf, BufMut, BytesMut};
4use serde::{Deserialize, Serialize};
5use serde_with::serde_as;
6use tungstenite::Message;
7
8use crate::{common::*, DigitalisError, DigitalisResult};
9
10macro_rules! impl_enum_from {
11 ($parent:ident, $child:ident, $child_ty:ident) => {
12 impl From<$child_ty> for $parent {
13 fn from(msg: $child_ty) -> Self {
14 $parent::$child(msg)
15 }
16 }
17
18 impl $child {
19 pub fn into_message(self) -> DigitalisResult<Message> {
20 $parent::from(self).to_message()
21 }
22 }
23 };
24 ($parent:ident, $child:ident) => {
25 impl_enum_from!($parent, $child, $child);
26 };
27}
28
29macro_rules! impl_into_text_message {
30 ($parent:ident, $child:ident) => {};
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34#[serde(tag = "op", rename_all = "camelCase")]
35pub enum ServerJsonMessage {
36 ServerInfo(ServerInfo),
37 Status(Status),
38 Advertise(Advertise),
39 Unadvertise(Unadvertise),
40 ParameterValues(ParameterValues),
41 AdvertiseServices(AdvertiseServices),
42 UnadvertiseServices(UnadvertiseServices),
43 ConnectionGraphUpdate(ConnectionGraphUpdate),
44}
45
46impl ServerJsonMessage {
47 pub fn to_message(&self) -> DigitalisResult<Message> {
48 Ok(Message::Text(self.serialize()?))
49 }
50
51 pub fn serialize(&self) -> DigitalisResult<String> {
52 Ok(serde_json::to_string(self)?)
53 }
54
55 pub fn deserialize(text: &str) -> DigitalisResult<Self> {
56 Ok(serde_json::from_str(text)?)
57 }
58}
59
60impl_enum_from!(ServerJsonMessage, ServerInfo);
61impl_enum_from!(ServerJsonMessage, Status);
62impl_enum_from!(ServerJsonMessage, Advertise);
63impl_enum_from!(ServerJsonMessage, Unadvertise);
64impl_enum_from!(ServerJsonMessage, ParameterValues);
65impl_enum_from!(ServerJsonMessage, AdvertiseServices);
66impl_enum_from!(ServerJsonMessage, UnadvertiseServices);
67
68impl_into_text_message!(ServerJsonMessage, ServerInfo);
69impl_into_text_message!(ServerJsonMessage, Status);
70impl_into_text_message!(ServerJsonMessage, Advertise);
71impl_into_text_message!(ServerJsonMessage, Unadvertise);
72impl_into_text_message!(ServerJsonMessage, ParameterValues);
73impl_into_text_message!(ServerJsonMessage, AdvertiseServices);
74impl_into_text_message!(ServerJsonMessage, UnadvertiseServices);
75
76#[serde_as]
77#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct ServerInfo {
80 pub name: String,
81 pub capabilities: Vec<Capability>,
82 pub supported_encodings: Vec<MessageEncoding>,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub metadata: Option<HashMap<String, serde_json::Value>>,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub session_id: Option<String>,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct Status {
91 pub level: Level,
92 pub message: String,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
96pub enum Level {
97 Info = 0,
98 Warning = 1,
99 Error = 2,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct Advertise {
105 pub channels: Vec<AdvertiseChannel>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct Unadvertise {
111 pub channel_ids: Vec<Id>,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(rename_all = "camelCase")]
116pub struct ParameterValues {
117 pub parameters: Vec<Parameter>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub id: Option<String>,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
123#[serde(rename_all = "camelCase")]
124pub struct AdvertiseServices {
125 pub services: Vec<Service>,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
129#[serde(rename_all = "camelCase")]
130pub struct Service {
131 pub id: Id,
132 pub name: String,
133 pub r#type: String,
134 pub request_schema: String,
135 pub response_schema: String,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(rename_all = "camelCase")]
140pub struct UnadvertiseServices {
141 pub ids: Vec<Id>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct ConnectionGraphUpdate {
147 pub publish_topics: Vec<PublishedTopic>,
148 pub suscribed_topics: Vec<SubscribedTopic>,
149 pub advertised_services: Vec<AdvertisedService>,
150 pub removed_topics: Vec<String>,
151 pub removed_services: Vec<String>,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
155#[serde(rename_all = "camelCase")]
156pub struct PublishedTopic {
157 pub name: String,
158 pub publisher_ids: Vec<String>,
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub struct SubscribedTopic {
164 pub name: String,
165 pub subscriber_ids: Vec<String>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169#[serde(rename_all = "camelCase")]
170pub struct AdvertisedService {
171 pub name: String,
172 pub provider_ids: Vec<String>,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub enum ServerBinaryMessage {
177 MessageData(MessageData),
178 Time(Time),
179 ServiceCallResponse(ServiceCallResponse),
180 FetchAssetResponse(FetchAssetResponse),
181}
182
183impl ServerBinaryMessage {
184 pub fn to_message(self) -> DigitalisResult<Message> {
185 let mut buf = BytesMut::new();
186 self.serialize(&mut buf);
187 Ok(Message::Binary(buf.into()))
188 }
189
190 pub fn serialize<T: BufMut>(&self, buf: &mut T) {
191 match self {
192 Self::MessageData(msg) => {
193 buf.put_u8(0x01);
194 msg.serialize(buf);
195 }
196 Self::Time(msg) => {
197 buf.put_u8(0x02);
198 msg.serialize(buf);
199 }
200 Self::ServiceCallResponse(msg) => {
201 buf.put_u8(0x03);
202 msg.serialize(buf);
203 }
204 Self::FetchAssetResponse(msg) => {
205 buf.put_u8(0x04);
206 msg.serialize(buf);
207 }
208 }
209 }
210
211 pub fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
212 Ok(match buf.get_u8() {
213 0x01 => Self::from(MessageData::deserialize(buf)?),
214 0x02 => Self::from(Time::deserialize(buf)?),
215 0x03 => Self::from(ServiceCallResponse::deserialize(buf)?),
216 0x04 => Self::from(FetchAssetResponse::deserialize(buf)?),
217 x => {
218 return Err(DigitalisError::BinaryDeserializeError(
219 format!("Unknown protocol {}", x).into(),
220 ))
221 }
222 })
223 }
224}
225
226impl_enum_from!(ServerBinaryMessage, MessageData);
227impl_enum_from!(ServerBinaryMessage, Time);
228impl_enum_from!(ServerBinaryMessage, ServiceCallResponse);
229impl_enum_from!(ServerBinaryMessage, FetchAssetResponse);
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct MessageData {
233 pub subscription_id: Id,
234 pub receive_timestamp: u64,
235 pub payload: Arc<Vec<u8>>,
236}
237
238impl MessageData {
239 fn serialize<T: BufMut>(&self, buf: &mut T) {
240 buf.put_u32_le(self.subscription_id);
241 buf.put_u64_le(self.receive_timestamp);
242 buf.put_slice(&self.payload);
243 }
244
245 fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
246 if buf.remaining() < size_of::<u32>() + size_of::<u64>() {
247 return Err(DigitalisError::BinaryDeserializeError(
248 "Data is too short".into(),
249 ));
250 }
251
252 let subscription_id = buf.get_u32_le();
253 let receive_timestamp = buf.get_u64_le();
254 let payload = buf.chunk().to_vec();
255 buf.advance(payload.len());
256
257 Ok(Self {
258 subscription_id,
259 receive_timestamp,
260 payload: Arc::new(payload),
261 })
262 }
263}
264
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub struct Time {
267 pub timestamp: u64,
268}
269
270impl Time {
271 fn serialize<T: BufMut>(&self, buf: &mut T) {
272 buf.put_u64_le(self.timestamp);
273 }
274
275 fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
276 if buf.remaining() != size_of::<u64>() {
277 return Err(DigitalisError::BinaryDeserializeError(
278 "Data is too short".into(),
279 ));
280 }
281
282 Ok(Self {
283 timestamp: buf.get_u64_le(),
284 })
285 }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct ServiceCallResponse {
290 pub service_id: Id,
291 pub call_id: Id,
292 pub encoding: Vec<u8>,
293 pub payload: Vec<u8>,
294}
295
296impl ServiceCallResponse {
297 fn serialize<T: BufMut>(&self, buf: &mut T) {
298 buf.put_u32_le(self.service_id);
299 buf.put_u32_le(self.call_id);
300 buf.put_u32_le(self.encoding.len() as u32);
301 buf.put_slice(&self.encoding);
302 buf.put_slice(&self.payload);
303 }
304
305 fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
306 if buf.remaining() < size_of::<u32>() * 3 {
307 return Err(DigitalisError::BinaryDeserializeError(
308 "Data is too short".into(),
309 ));
310 }
311
312 let service_id = buf.get_u32_le();
313 let call_id = buf.get_u32_le();
314
315 let encoding_len = buf.get_u32_le() as usize;
316 if buf.remaining() < encoding_len {
317 return Err(DigitalisError::BinaryDeserializeError(
318 "Data is too short".into(),
319 ));
320 }
321 let encoding = buf.chunk()[..encoding_len].to_vec();
322 buf.advance(encoding.len());
323
324 let payload = buf.chunk().to_vec();
325 buf.advance(payload.len());
326
327 Ok(Self {
328 service_id,
329 call_id,
330 encoding,
331 payload,
332 })
333 }
334}
335
336#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct FetchAssetResponse {
338 pub request_id: Id,
339 pub status: u8,
340 pub error_message: Vec<u8>,
341 pub asset_data: Vec<u8>,
342}
343
344impl FetchAssetResponse {
345 fn serialize<T: BufMut>(&self, buf: &mut T) {
346 buf.put_u32_le(self.request_id);
347 buf.put_u8(self.status);
348 buf.put_u32_le(self.error_message.len() as u32);
349 buf.put_slice(&self.error_message);
350 buf.put_slice(&self.asset_data);
351 }
352
353 fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
354 if buf.remaining() < size_of::<u32>() * 2 + size_of::<u8>() {
355 return Err(DigitalisError::BinaryDeserializeError(
356 "Data is too short".into(),
357 ));
358 }
359
360 let request_id = buf.get_u32_le();
361 let status = buf.get_u8();
362
363 let error_message_len = buf.get_u32_le() as usize;
364 if buf.remaining() < error_message_len {
365 return Err(DigitalisError::BinaryDeserializeError(
366 "Data is too short".into(),
367 ));
368 }
369 let error_message = buf.chunk()[..error_message_len].to_vec();
370 buf.advance(error_message.len());
371
372 let asset_data = buf.chunk().to_vec();
373 buf.advance(asset_data.len());
374
375 Ok(Self {
376 request_id,
377 status,
378 error_message,
379 asset_data,
380 })
381 }
382}
383
384#[cfg(test)]
385mod test {
386 use super::*;
387
388 #[test]
389 fn test_serialize_and_deserialize_message_data() {
390 let msg = MessageData {
391 subscription_id: 25,
392 receive_timestamp: 23893748,
393 payload: Arc::new(vec![1, 23, 125]),
394 };
395
396 let mut buf = Vec::new();
397 msg.serialize(&mut buf);
398 let msg2 = MessageData::deserialize(&mut buf.as_slice()).unwrap();
399
400 assert_eq!(msg, msg2);
401 }
402
403 #[test]
404 fn test_serialize_and_deserialize_time() {
405 let msg = Time {
406 timestamp: 23893748,
407 };
408
409 let mut buf = Vec::new();
410 msg.serialize(&mut buf);
411 let msg2 = Time::deserialize(&mut buf.as_slice()).unwrap();
412
413 assert_eq!(msg, msg2);
414 }
415
416 #[test]
417 fn test_serialize_and_deserialize_service_call_response() {
418 let msg = ServiceCallResponse {
419 service_id: 25,
420 call_id: 23893748,
421 encoding: vec![1, 23, 125],
422 payload: vec![25, 225, 23, 125],
423 };
424
425 let mut buf = Vec::new();
426 msg.serialize(&mut buf);
427 let msg2 = ServiceCallResponse::deserialize(&mut buf.as_slice()).unwrap();
428
429 assert_eq!(msg, msg2);
430 }
431}