1use std::{mem::size_of, sync::Arc};
2
3use bytes::{Buf, BufMut};
4use serde::{Deserialize, Serialize};
5use tungstenite::Message;
6
7use crate::{common::*, DigitalisError, DigitalisResult};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum ClientMessage {
11 Subscribe(Subscribe),
12 Unsubscribe(Unsubscribe),
13 Advertise(ClientAdvertise),
14 Unadvertise(ClientUnadvertise),
15 GetParameters(GetParameters),
16 SetParameters(SetParameters),
17 SubscribeParameterUpdates(SubscribeParameterUpdates),
18 UnsubscribeParameterUpdates(UnsubscribeParameterUpdates),
19 SubscribeConnectionGraph,
20 UnsubscribeConnectionGraph,
21 FetchAsset(FetchAsset),
22 MessageData(MessageData),
23 ServiceCallRequest(ServiceCallRequest),
24 Close,
25}
26
27impl ClientMessage {
28 pub fn from_ws_message(raw_msg: Message) -> DigitalisResult<Self> {
29 match raw_msg {
30 Message::Binary(msg) => {
31 Ok(ClientBinaryMessage::deserialize(&mut msg.as_slice())?.into())
32 }
33 Message::Text(msg) => Ok(ClientJsonMessage::deserialize(&msg)?.into()),
34 Message::Close(_) => Ok(Self::Close),
35 m => Err(DigitalisError::UnexpectedWebsocketMessage(
36 format!("{}", m).into(),
37 )),
38 }
39 }
40}
41
42impl From<ClientJsonMessage> for ClientMessage {
43 fn from(msg: ClientJsonMessage) -> Self {
44 use ClientJsonMessage::*;
45 match msg {
46 Subscribe(msg) => Self::Subscribe(msg),
47 Unsubscribe(msg) => Self::Unsubscribe(msg),
48 Advertise(msg) => Self::Advertise(msg),
49 Unadvertise(msg) => Self::Unadvertise(msg),
50 GetParameters(msg) => Self::GetParameters(msg),
51 SetParameters(msg) => Self::SetParameters(msg),
52 SubscribeParameterUpdates(msg) => Self::SubscribeParameterUpdates(msg),
53 UnsubscribeParameterUpdates(msg) => Self::UnsubscribeParameterUpdates(msg),
54 SubscribeConnectionGraph => Self::SubscribeConnectionGraph,
55 UnsubscribeConnectionGraph => Self::UnsubscribeConnectionGraph,
56 FetchAsset(msg) => Self::FetchAsset(msg),
57 }
58 }
59}
60
61impl From<ClientBinaryMessage> for ClientMessage {
62 fn from(msg: ClientBinaryMessage) -> Self {
63 use ClientBinaryMessage::*;
64 match msg {
65 MessageData(msg) => Self::MessageData(msg),
66 ServiceCallRequest(msg) => Self::ServiceCallRequest(msg),
67 }
68 }
69}
70
71macro_rules! impl_enum_from {
72 ($parent:ident, $child:ident,$child_ty:ident) => {
73 impl From<$child_ty> for $parent {
74 fn from(msg: $child_ty) -> Self {
75 $parent::$child(msg)
76 }
77 }
78 };
79 ($parent:ident, $child:ident) => {
80 impl_enum_from!($parent, $child, $child);
81 };
82}
83
84macro_rules! impl_into_text_message {
85 ($parent:ident, $child:ident) => {
86 impl $child {
87 pub fn into_message(self) -> DigitalisResult<Message> {
88 $parent::from(self).to_message()
89 }
90 }
91 };
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
95#[serde(tag = "op", rename_all = "camelCase")]
96pub enum ClientJsonMessage {
97 Subscribe(Subscribe),
98 Unsubscribe(Unsubscribe),
99 Advertise(ClientAdvertise),
100 Unadvertise(ClientUnadvertise),
101 GetParameters(GetParameters),
102 SetParameters(SetParameters),
103 SubscribeParameterUpdates(SubscribeParameterUpdates),
104 UnsubscribeParameterUpdates(UnsubscribeParameterUpdates),
105 SubscribeConnectionGraph,
106 UnsubscribeConnectionGraph,
107 FetchAsset(FetchAsset),
108}
109
110impl ClientJsonMessage {
111 pub fn to_message(&self) -> DigitalisResult<Message> {
112 Ok(Message::Text(self.serialize()?))
113 }
114
115 pub fn serialize(&self) -> DigitalisResult<String> {
116 Ok(serde_json::to_string(self)?)
117 }
118
119 pub fn deserialize(text: &str) -> DigitalisResult<Self> {
120 Ok(serde_json::from_str(text)?)
121 }
122}
123
124impl_enum_from!(ClientJsonMessage, Subscribe);
125impl_enum_from!(ClientJsonMessage, Unsubscribe);
126impl_enum_from!(ClientJsonMessage, Advertise, ClientAdvertise);
127impl_enum_from!(ClientJsonMessage, Unadvertise, ClientUnadvertise);
128impl_enum_from!(ClientJsonMessage, GetParameters);
129impl_enum_from!(ClientJsonMessage, SetParameters);
130impl_enum_from!(ClientJsonMessage, SubscribeParameterUpdates);
131impl_enum_from!(ClientJsonMessage, UnsubscribeParameterUpdates);
132impl_enum_from!(ClientJsonMessage, FetchAsset);
133
134impl_into_text_message!(ClientJsonMessage, Subscribe);
135impl_into_text_message!(ClientJsonMessage, Unsubscribe);
136impl_into_text_message!(ClientJsonMessage, ClientAdvertise);
137impl_into_text_message!(ClientJsonMessage, ClientUnadvertise);
138impl_into_text_message!(ClientJsonMessage, GetParameters);
139impl_into_text_message!(ClientJsonMessage, SetParameters);
140impl_into_text_message!(ClientJsonMessage, SubscribeParameterUpdates);
141impl_into_text_message!(ClientJsonMessage, UnsubscribeParameterUpdates);
142impl_into_text_message!(ClientJsonMessage, FetchAsset);
143
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct Subscribe {
147 pub subscriptions: Vec<SubscribeChannel>,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
151#[serde(rename_all = "camelCase")]
152pub struct Unsubscribe {
153 pub subscription_ids: Vec<Id>,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(rename_all = "camelCase")]
158pub struct ClientAdvertise {
159 pub channels: Vec<AdvertiseChannel>,
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ClientUnadvertise {
165 pub channel_ids: Vec<Id>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
169#[serde(rename_all = "camelCase")]
170pub struct GetParameters {
171 pub parameter_names: Vec<String>,
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub id: Option<String>,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
177#[serde(rename_all = "camelCase")]
178pub struct SetParameters {
179 pub parameters: Vec<Parameter>,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub id: Option<String>,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(rename_all = "camelCase")]
186pub struct SubscribeParameterUpdates {
187 pub parameter_names: Vec<String>,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
191#[serde(rename_all = "camelCase")]
192pub struct UnsubscribeParameterUpdates {
193 pub parameter_names: Vec<String>,
194}
195
196#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
197#[serde(rename_all = "camelCase")]
198pub struct FetchAsset {
199 pub uri: String,
200 pub request_id: Id,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum ClientBinaryMessage {
205 MessageData(MessageData),
206 ServiceCallRequest(ServiceCallRequest),
207}
208
209impl ClientBinaryMessage {
210 pub fn serialize<T: BufMut>(&self, buf: &mut T) {
211 match self {
212 Self::MessageData(msg) => {
213 buf.put_u8(0x01);
214 msg.serialize(buf);
215 }
216 Self::ServiceCallRequest(msg) => {
217 buf.put_u8(0x02);
218 msg.serialize(buf);
219 }
220 }
221 }
222
223 pub fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
224 Ok(match buf.get_u8() {
225 0x01 => Self::from(MessageData::deserialize(buf)?),
226 0x02 => Self::from(ServiceCallRequest::deserialize(buf)?),
227 x => {
228 return Err(DigitalisError::BinaryDeserializeError(
229 format!("Unknown protocol {}", x).into(),
230 ))
231 }
232 })
233 }
234}
235
236impl_enum_from!(ClientBinaryMessage, MessageData);
237impl_enum_from!(ClientBinaryMessage, ServiceCallRequest);
238
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct MessageData {
241 pub channel_id: Id,
242 pub payload: Arc<Vec<u8>>,
243}
244
245impl MessageData {
246 fn serialize<T: BufMut>(&self, buf: &mut T) {
247 buf.put_u32_le(self.channel_id);
248 buf.put_slice(&self.payload);
249 }
250
251 fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
252 if buf.remaining() < size_of::<u32>() {
253 return Err(DigitalisError::BinaryDeserializeError(
254 "Data is too short".into(),
255 ));
256 }
257
258 let channel_id = buf.get_u32_le();
259 let payload = buf.chunk().to_vec();
260 buf.advance(payload.len());
261
262 Ok(Self {
263 channel_id,
264 payload: Arc::new(payload),
265 })
266 }
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct ServiceCallRequest {
271 pub service_id: Id,
272 pub call_id: Id,
273 pub encoding: Vec<u8>,
274 pub payload: Vec<u8>,
275}
276
277impl ServiceCallRequest {
278 fn serialize<T: BufMut>(&self, buf: &mut T) {
279 buf.put_u32_le(self.service_id);
280 buf.put_u32_le(self.call_id);
281 buf.put_u32_le(self.encoding.len() as u32);
282 buf.put_slice(&self.encoding);
283 buf.put_slice(&self.payload);
284 }
285
286 fn deserialize<T: Buf>(buf: &mut T) -> DigitalisResult<Self> {
287 if buf.remaining() < size_of::<u32>() * 3 {
288 return Err(DigitalisError::BinaryDeserializeError(
289 "Data is too short".into(),
290 ));
291 }
292
293 let service_id = buf.get_u32_le();
294 let call_id = buf.get_u32_le();
295
296 let encoding_len = buf.get_u32_le() as usize;
297 if buf.remaining() < encoding_len {
298 return Err(DigitalisError::BinaryDeserializeError(
299 "Data is too short".into(),
300 ));
301 }
302 let encoding = buf.chunk()[..encoding_len].to_vec();
303 buf.advance(encoding.len());
304
305 let payload = buf.chunk().to_vec();
306 buf.advance(payload.len());
307
308 Ok(Self {
309 service_id,
310 call_id,
311 encoding,
312 payload,
313 })
314 }
315}