Skip to main content

digitalis_core/
server.rs

1use std::{collections::HashMap, mem::size_of, sync::Arc};
2
3use bytes::{Buf, BufMut, BytesMut};
4use serde::{Deserialize, Serialize};
5use serde_with::serde_as;
6use tungstenite::Message;
7
8use crate::{common::*, DigitalisError, DigitalisResult};
9
10macro_rules! impl_enum_from {
11    ($parent:ident, $child:ident, $child_ty:ident) => {
12        impl From<$child_ty> for $parent {
13            fn from(msg: $child_ty) -> Self {
14                $parent::$child(msg)
15            }
16        }
17
18        impl $child {
19            pub fn into_message(self) -> DigitalisResult<Message> {
20                $parent::from(self).to_message()
21            }
22        }
23    };
24    ($parent:ident, $child:ident) => {
25        impl_enum_from!($parent, $child, $child);
26    };
27}
28
29macro_rules! impl_into_text_message {
30    ($parent:ident, $child:ident) => {};
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34#[serde(tag = "op", rename_all = "camelCase")]
35pub enum ServerJsonMessage {
36    ServerInfo(ServerInfo),
37    Status(Status),
38    Advertise(Advertise),
39    Unadvertise(Unadvertise),
40    ParameterValues(ParameterValues),
41    AdvertiseServices(AdvertiseServices),
42    UnadvertiseServices(UnadvertiseServices),
43    ConnectionGraphUpdate(ConnectionGraphUpdate),
44}
45
46impl ServerJsonMessage {
47    pub fn to_message(&self) -> DigitalisResult<Message> {
48        Ok(Message::Text(self.serialize()?))
49    }
50
51    pub fn serialize(&self) -> DigitalisResult<String> {
52        Ok(serde_json::to_string(self)?)
53    }
54
55    pub fn deserialize(text: &str) -> DigitalisResult<Self> {
56        Ok(serde_json::from_str(text)?)
57    }
58}
59
60impl_enum_from!(ServerJsonMessage, ServerInfo);
61impl_enum_from!(ServerJsonMessage, Status);
62impl_enum_from!(ServerJsonMessage, Advertise);
63impl_enum_from!(ServerJsonMessage, Unadvertise);
64impl_enum_from!(ServerJsonMessage, ParameterValues);
65impl_enum_from!(ServerJsonMessage, AdvertiseServices);
66impl_enum_from!(ServerJsonMessage, UnadvertiseServices);
67
68impl_into_text_message!(ServerJsonMessage, ServerInfo);
69impl_into_text_message!(ServerJsonMessage, Status);
70impl_into_text_message!(ServerJsonMessage, Advertise);
71impl_into_text_message!(ServerJsonMessage, Unadvertise);
72impl_into_text_message!(ServerJsonMessage, ParameterValues);
73impl_into_text_message!(ServerJsonMessage, AdvertiseServices);
74impl_into_text_message!(ServerJsonMessage, UnadvertiseServices);
75
76#[serde_as]
77#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct ServerInfo {
80    pub name: String,
81    pub capabilities: Vec<Capability>,
82    pub supported_encodings: Vec<MessageEncoding>,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<HashMap<String, serde_json::Value>>,
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub session_id: Option<String>,
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct Status {
91    pub level: Level,
92    pub message: String,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
96pub enum Level {
97    Info = 0,
98    Warning = 1,
99    Error = 2,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct Advertise {
105    pub channels: Vec<AdvertiseChannel>,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct Unadvertise {
111    pub channel_ids: Vec<Id>,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(rename_all = "camelCase")]
116pub struct ParameterValues {
117    pub parameters: Vec<Parameter>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub id: Option<String>,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
123#[serde(rename_all = "camelCase")]
124pub struct AdvertiseServices {
125    pub services: Vec<Service>,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
129#[serde(rename_all = "camelCase")]
130pub struct Service {
131    pub id: Id,
132    pub name: String,
133    pub r#type: String,
134    pub request_schema: String,
135    pub response_schema: String,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(rename_all = "camelCase")]
140pub struct UnadvertiseServices {
141    pub ids: Vec<Id>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct ConnectionGraphUpdate {
147    pub publish_topics: Vec<PublishedTopic>,
148    pub suscribed_topics: Vec<SubscribedTopic>,
149    pub advertised_services: Vec<AdvertisedService>,
150    pub removed_topics: Vec<String>,
151    pub removed_services: Vec<String>,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
155#[serde(rename_all = "camelCase")]
156pub struct PublishedTopic {
157    pub name: String,
158    pub publisher_ids: Vec<String>,
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub struct SubscribedTopic {
164    pub name: String,
165    pub subscriber_ids: Vec<String>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169#[serde(rename_all = "camelCase")]
170pub struct AdvertisedService {
171    pub name: String,
172    pub provider_ids: Vec<String>,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub enum ServerBinaryMessage {
177    MessageData(MessageData),
178    Time(Time),
179    ServiceCallResponse(ServiceCallResponse),
180    FetchAssetResponse(FetchAssetResponse),
181}
182
183impl ServerBinaryMessage {
184    pub fn to_message(self) -> DigitalisResult<Message> {
185        let mut buf = BytesMut::new();
186        self.serialize(&mut buf);
187        Ok(Message::Binary(buf.into()))
188    }
189
190    pub fn serialize<T: BufMut>(&self, buf: &mut T) {
191        match self {
192            Self::MessageData(msg) => {
193                buf.put_u8(0x01);
194                msg.serialize(buf);
195            }
196            Self::Time(msg) => {
197                buf.put_u8(0x02);
198                msg.serialize(buf);
199            }
200            Self::ServiceCallResponse(msg) => {
201                buf.put_u8(0x03);
202                msg.serialize(buf);
203            }
204            Self::FetchAssetResponse(msg) => {
205                buf.put_u8(0x04);
206                msg.serialize(buf);
207            }
208        }
209    }
210
211    pub fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
212        Ok(match buf.get_u8() {
213            0x01 => Self::from(MessageData::deserialize(buf)?),
214            0x02 => Self::from(Time::deserialize(buf)?),
215            0x03 => Self::from(ServiceCallResponse::deserialize(buf)?),
216            0x04 => Self::from(FetchAssetResponse::deserialize(buf)?),
217            x => {
218                return Err(DigitalisError::BinaryDeserializeError(
219                    format!("Unknown protocol {}", x).into(),
220                ))
221            }
222        })
223    }
224}
225
226impl_enum_from!(ServerBinaryMessage, MessageData);
227impl_enum_from!(ServerBinaryMessage, Time);
228impl_enum_from!(ServerBinaryMessage, ServiceCallResponse);
229impl_enum_from!(ServerBinaryMessage, FetchAssetResponse);
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct MessageData {
233    pub subscription_id: Id,
234    pub receive_timestamp: u64,
235    pub payload: Arc<Vec<u8>>,
236}
237
238impl MessageData {
239    fn serialize<T: BufMut>(&self, buf: &mut T) {
240        buf.put_u32_le(self.subscription_id);
241        buf.put_u64_le(self.receive_timestamp);
242        buf.put_slice(&self.payload);
243    }
244
245    fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
246        if buf.remaining() < size_of::<u32>() + size_of::<u64>() {
247            return Err(DigitalisError::BinaryDeserializeError(
248                "Data is too short".into(),
249            ));
250        }
251
252        let subscription_id = buf.get_u32_le();
253        let receive_timestamp = buf.get_u64_le();
254        let payload = buf.chunk().to_vec();
255        buf.advance(payload.len());
256
257        Ok(Self {
258            subscription_id,
259            receive_timestamp,
260            payload: Arc::new(payload),
261        })
262    }
263}
264
265#[derive(Debug, Clone, PartialEq, Eq)]
266pub struct Time {
267    pub timestamp: u64,
268}
269
270impl Time {
271    fn serialize<T: BufMut>(&self, buf: &mut T) {
272        buf.put_u64_le(self.timestamp);
273    }
274
275    fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
276        if buf.remaining() != size_of::<u64>() {
277            return Err(DigitalisError::BinaryDeserializeError(
278                "Data is too short".into(),
279            ));
280        }
281
282        Ok(Self {
283            timestamp: buf.get_u64_le(),
284        })
285    }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct ServiceCallResponse {
290    pub service_id: Id,
291    pub call_id: Id,
292    pub encoding: Vec<u8>,
293    pub payload: Vec<u8>,
294}
295
296impl ServiceCallResponse {
297    fn serialize<T: BufMut>(&self, buf: &mut T) {
298        buf.put_u32_le(self.service_id);
299        buf.put_u32_le(self.call_id);
300        buf.put_u32_le(self.encoding.len() as u32);
301        buf.put_slice(&self.encoding);
302        buf.put_slice(&self.payload);
303    }
304
305    fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
306        if buf.remaining() < size_of::<u32>() * 3 {
307            return Err(DigitalisError::BinaryDeserializeError(
308                "Data is too short".into(),
309            ));
310        }
311
312        let service_id = buf.get_u32_le();
313        let call_id = buf.get_u32_le();
314
315        let encoding_len = buf.get_u32_le() as usize;
316        if buf.remaining() < encoding_len {
317            return Err(DigitalisError::BinaryDeserializeError(
318                "Data is too short".into(),
319            ));
320        }
321        let encoding = buf.chunk()[..encoding_len].to_vec();
322        buf.advance(encoding.len());
323
324        let payload = buf.chunk().to_vec();
325        buf.advance(payload.len());
326
327        Ok(Self {
328            service_id,
329            call_id,
330            encoding,
331            payload,
332        })
333    }
334}
335
336#[derive(Debug, Clone, PartialEq, Eq)]
337pub struct FetchAssetResponse {
338    pub request_id: Id,
339    pub status: u8,
340    pub error_message: Vec<u8>,
341    pub asset_data: Vec<u8>,
342}
343
344impl FetchAssetResponse {
345    fn serialize<T: BufMut>(&self, buf: &mut T) {
346        buf.put_u32_le(self.request_id);
347        buf.put_u8(self.status);
348        buf.put_u32_le(self.error_message.len() as u32);
349        buf.put_slice(&self.error_message);
350        buf.put_slice(&self.asset_data);
351    }
352
353    fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
354        if buf.remaining() < size_of::<u32>() * 2 + size_of::<u8>() {
355            return Err(DigitalisError::BinaryDeserializeError(
356                "Data is too short".into(),
357            ));
358        }
359
360        let request_id = buf.get_u32_le();
361        let status = buf.get_u8();
362
363        let error_message_len = buf.get_u32_le() as usize;
364        if buf.remaining() < error_message_len {
365            return Err(DigitalisError::BinaryDeserializeError(
366                "Data is too short".into(),
367            ));
368        }
369        let error_message = buf.chunk()[..error_message_len].to_vec();
370        buf.advance(error_message.len());
371
372        let asset_data = buf.chunk().to_vec();
373        buf.advance(asset_data.len());
374
375        Ok(Self {
376            request_id,
377            status,
378            error_message,
379            asset_data,
380        })
381    }
382}
383
384#[cfg(test)]
385mod test {
386    use super::*;
387
388    #[test]
389    fn test_serialize_and_deserialize_message_data() {
390        let msg = MessageData {
391            subscription_id: 25,
392            receive_timestamp: 23893748,
393            payload: Arc::new(vec![1, 23, 125]),
394        };
395
396        let mut buf = Vec::new();
397        msg.serialize(&mut buf);
398        let msg2 = MessageData::deserialize(&mut buf.as_slice()).unwrap();
399
400        assert_eq!(msg, msg2);
401    }
402
403    #[test]
404    fn test_serialize_and_deserialize_time() {
405        let msg = Time {
406            timestamp: 23893748,
407        };
408
409        let mut buf = Vec::new();
410        msg.serialize(&mut buf);
411        let msg2 = Time::deserialize(&mut buf.as_slice()).unwrap();
412
413        assert_eq!(msg, msg2);
414    }
415
416    #[test]
417    fn test_serialize_and_deserialize_service_call_response() {
418        let msg = ServiceCallResponse {
419            service_id: 25,
420            call_id: 23893748,
421            encoding: vec![1, 23, 125],
422            payload: vec![25, 225, 23, 125],
423        };
424
425        let mut buf = Vec::new();
426        msg.serialize(&mut buf);
427        let msg2 = ServiceCallResponse::deserialize(&mut buf.as_slice()).unwrap();
428
429        assert_eq!(msg, msg2);
430    }
431}