Skip to main content

digitalis_core/
client.rs

1use std::{mem::size_of, sync::Arc};
2
3use bytes::{Buf, BufMut};
4use serde::{Deserialize, Serialize};
5use tungstenite::Message;
6
7use crate::{common::*, DigitalisError, DigitalisResult};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum ClientMessage {
11    Subscribe(Subscribe),
12    Unsubscribe(Unsubscribe),
13    Advertise(ClientAdvertise),
14    Unadvertise(ClientUnadvertise),
15    GetParameters(GetParameters),
16    SetParameters(SetParameters),
17    SubscribeParameterUpdates(SubscribeParameterUpdates),
18    UnsubscribeParameterUpdates(UnsubscribeParameterUpdates),
19    SubscribeConnectionGraph,
20    UnsubscribeConnectionGraph,
21    FetchAsset(FetchAsset),
22    MessageData(MessageData),
23    ServiceCallRequest(ServiceCallRequest),
24    Close,
25}
26
27impl ClientMessage {
28    pub fn from_ws_message(raw_msg: Message) -> DigitalisResult<Self> {
29        match raw_msg {
30            Message::Binary(msg) => {
31                Ok(ClientBinaryMessage::deserialize(&mut msg.as_slice())?.into())
32            }
33            Message::Text(msg) => Ok(ClientJsonMessage::deserialize(&msg)?.into()),
34            Message::Close(_) => Ok(Self::Close),
35            m => Err(DigitalisError::UnexpectedWebsocketMessage(
36                format!("{}", m).into(),
37            )),
38        }
39    }
40}
41
42impl From<ClientJsonMessage> for ClientMessage {
43    fn from(msg: ClientJsonMessage) -> Self {
44        use ClientJsonMessage::*;
45        match msg {
46            Subscribe(msg) => Self::Subscribe(msg),
47            Unsubscribe(msg) => Self::Unsubscribe(msg),
48            Advertise(msg) => Self::Advertise(msg),
49            Unadvertise(msg) => Self::Unadvertise(msg),
50            GetParameters(msg) => Self::GetParameters(msg),
51            SetParameters(msg) => Self::SetParameters(msg),
52            SubscribeParameterUpdates(msg) => Self::SubscribeParameterUpdates(msg),
53            UnsubscribeParameterUpdates(msg) => Self::UnsubscribeParameterUpdates(msg),
54            SubscribeConnectionGraph => Self::SubscribeConnectionGraph,
55            UnsubscribeConnectionGraph => Self::UnsubscribeConnectionGraph,
56            FetchAsset(msg) => Self::FetchAsset(msg),
57        }
58    }
59}
60
61impl From<ClientBinaryMessage> for ClientMessage {
62    fn from(msg: ClientBinaryMessage) -> Self {
63        use ClientBinaryMessage::*;
64        match msg {
65            MessageData(msg) => Self::MessageData(msg),
66            ServiceCallRequest(msg) => Self::ServiceCallRequest(msg),
67        }
68    }
69}
70
71macro_rules! impl_enum_from {
72    ($parent:ident, $child:ident,$child_ty:ident) => {
73        impl From<$child_ty> for $parent {
74            fn from(msg: $child_ty) -> Self {
75                $parent::$child(msg)
76            }
77        }
78    };
79    ($parent:ident, $child:ident) => {
80        impl_enum_from!($parent, $child, $child);
81    };
82}
83
84macro_rules! impl_into_text_message {
85    ($parent:ident, $child:ident) => {
86        impl $child {
87            pub fn into_message(self) -> DigitalisResult<Message> {
88                $parent::from(self).to_message()
89            }
90        }
91    };
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
95#[serde(tag = "op", rename_all = "camelCase")]
96pub enum ClientJsonMessage {
97    Subscribe(Subscribe),
98    Unsubscribe(Unsubscribe),
99    Advertise(ClientAdvertise),
100    Unadvertise(ClientUnadvertise),
101    GetParameters(GetParameters),
102    SetParameters(SetParameters),
103    SubscribeParameterUpdates(SubscribeParameterUpdates),
104    UnsubscribeParameterUpdates(UnsubscribeParameterUpdates),
105    SubscribeConnectionGraph,
106    UnsubscribeConnectionGraph,
107    FetchAsset(FetchAsset),
108}
109
110impl ClientJsonMessage {
111    pub fn to_message(&self) -> DigitalisResult<Message> {
112        Ok(Message::Text(self.serialize()?))
113    }
114
115    pub fn serialize(&self) -> DigitalisResult<String> {
116        Ok(serde_json::to_string(self)?)
117    }
118
119    pub fn deserialize(text: &str) -> DigitalisResult<Self> {
120        Ok(serde_json::from_str(text)?)
121    }
122}
123
124impl_enum_from!(ClientJsonMessage, Subscribe);
125impl_enum_from!(ClientJsonMessage, Unsubscribe);
126impl_enum_from!(ClientJsonMessage, Advertise, ClientAdvertise);
127impl_enum_from!(ClientJsonMessage, Unadvertise, ClientUnadvertise);
128impl_enum_from!(ClientJsonMessage, GetParameters);
129impl_enum_from!(ClientJsonMessage, SetParameters);
130impl_enum_from!(ClientJsonMessage, SubscribeParameterUpdates);
131impl_enum_from!(ClientJsonMessage, UnsubscribeParameterUpdates);
132impl_enum_from!(ClientJsonMessage, FetchAsset);
133
134impl_into_text_message!(ClientJsonMessage, Subscribe);
135impl_into_text_message!(ClientJsonMessage, Unsubscribe);
136impl_into_text_message!(ClientJsonMessage, ClientAdvertise);
137impl_into_text_message!(ClientJsonMessage, ClientUnadvertise);
138impl_into_text_message!(ClientJsonMessage, GetParameters);
139impl_into_text_message!(ClientJsonMessage, SetParameters);
140impl_into_text_message!(ClientJsonMessage, SubscribeParameterUpdates);
141impl_into_text_message!(ClientJsonMessage, UnsubscribeParameterUpdates);
142impl_into_text_message!(ClientJsonMessage, FetchAsset);
143
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct Subscribe {
147    pub subscriptions: Vec<SubscribeChannel>,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
151#[serde(rename_all = "camelCase")]
152pub struct Unsubscribe {
153    pub subscription_ids: Vec<Id>,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(rename_all = "camelCase")]
158pub struct ClientAdvertise {
159    pub channels: Vec<AdvertiseChannel>,
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ClientUnadvertise {
165    pub channel_ids: Vec<Id>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169#[serde(rename_all = "camelCase")]
170pub struct GetParameters {
171    pub parameter_names: Vec<String>,
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub id: Option<String>,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
177#[serde(rename_all = "camelCase")]
178pub struct SetParameters {
179    pub parameters: Vec<Parameter>,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub id: Option<String>,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(rename_all = "camelCase")]
186pub struct SubscribeParameterUpdates {
187    pub parameter_names: Vec<String>,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
191#[serde(rename_all = "camelCase")]
192pub struct UnsubscribeParameterUpdates {
193    pub parameter_names: Vec<String>,
194}
195
196#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
197#[serde(rename_all = "camelCase")]
198pub struct FetchAsset {
199    pub uri: String,
200    pub request_id: Id,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum ClientBinaryMessage {
205    MessageData(MessageData),
206    ServiceCallRequest(ServiceCallRequest),
207}
208
209impl ClientBinaryMessage {
210    pub fn serialize<T: BufMut>(&self, buf: &mut T) {
211        match self {
212            Self::MessageData(msg) => {
213                buf.put_u8(0x01);
214                msg.serialize(buf);
215            }
216            Self::ServiceCallRequest(msg) => {
217                buf.put_u8(0x02);
218                msg.serialize(buf);
219            }
220        }
221    }
222
223    pub fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
224        Ok(match buf.get_u8() {
225            0x01 => Self::from(MessageData::deserialize(buf)?),
226            0x02 => Self::from(ServiceCallRequest::deserialize(buf)?),
227            x => {
228                return Err(DigitalisError::BinaryDeserializeError(
229                    format!("Unknown protocol {}", x).into(),
230                ))
231            }
232        })
233    }
234}
235
236impl_enum_from!(ClientBinaryMessage, MessageData);
237impl_enum_from!(ClientBinaryMessage, ServiceCallRequest);
238
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct MessageData {
241    pub channel_id: Id,
242    pub payload: Arc<Vec<u8>>,
243}
244
245impl MessageData {
246    fn serialize<T: BufMut>(&self, buf: &mut T) {
247        buf.put_u32_le(self.channel_id);
248        buf.put_slice(&self.payload);
249    }
250
251    fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
252        if buf.remaining() < size_of::<u32>() {
253            return Err(DigitalisError::BinaryDeserializeError(
254                "Data is too short".into(),
255            ));
256        }
257
258        let channel_id = buf.get_u32_le();
259        let payload = buf.chunk().to_vec();
260        buf.advance(payload.len());
261
262        Ok(Self {
263            channel_id,
264            payload: Arc::new(payload),
265        })
266    }
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct ServiceCallRequest {
271    pub service_id: Id,
272    pub call_id: Id,
273    pub encoding: Vec<u8>,
274    pub payload: Vec<u8>,
275}
276
277impl ServiceCallRequest {
278    fn serialize<T: BufMut>(&self, buf: &mut T) {
279        buf.put_u32_le(self.service_id);
280        buf.put_u32_le(self.call_id);
281        buf.put_u32_le(self.encoding.len() as u32);
282        buf.put_slice(&self.encoding);
283        buf.put_slice(&self.payload);
284    }
285
286    fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
287        if buf.remaining() < size_of::<u32>() * 3 {
288            return Err(DigitalisError::BinaryDeserializeError(
289                "Data is too short".into(),
290            ));
291        }
292
293        let service_id = buf.get_u32_le();
294        let call_id = buf.get_u32_le();
295
296        let encoding_len = buf.get_u32_le() as usize;
297        if buf.remaining() < encoding_len {
298            return Err(DigitalisError::BinaryDeserializeError(
299                "Data is too short".into(),
300            ));
301        }
302        let encoding = buf.chunk()[..encoding_len].to_vec();
303        buf.advance(encoding.len());
304
305        let payload = buf.chunk().to_vec();
306        buf.advance(payload.len());
307
308        Ok(Self {
309            service_id,
310            call_id,
311            encoding,
312            payload,
313        })
314    }
315}