nt_rs/types.rs
1use std::string::FromUtf8Error;
2
3use rmp::{
4 decode::{self, NumValueReadError, ValueReadError},
5 encode::{self, ValueWriteError},
6};
7use serde::{Deserialize, Serialize};
8use thiserror::Error;
9
10fn should_skip(val: &MissingOrNull<bool>) -> bool {
11 *val == MissingOrNull::Missing
12}
13
14fn skip_none<T>(val: &Option<T>) -> bool {
15 val.is_none()
16}
17
18/// Each published topic may also have properties associated to it. Properties are represented in
19/// the protocol as JSON and thus property values may be any JSON type. Property keys must be
20/// strings. The following properties have a defined meaning in this spec. Servers shall support
21/// arbitrary properties being set outside of this set. Clients shall ignore properties they do not
22/// recognize. Properties are initially set on publish and may be changed (by any client) using
23/// [TextMessage::SetProperties]
24#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
25pub struct Properties {
26 /// If true, the last set value will be periodically saved to persistent storage on the server
27 /// and be restored during server startup. Topics with this property set to true will not be
28 /// deleted by the server when the last publisher stops publishing.
29 #[serde(
30 with = "missing_or_null_impls",
31 default,
32 skip_serializing_if = "should_skip"
33 )]
34 pub persistent: MissingOrNull<bool>,
35
36 /// Topics with this property set to true will not be deleted by the server when the last
37 /// publisher stops publishing.
38 #[serde(
39 with = "missing_or_null_impls",
40 default,
41 skip_serializing_if = "should_skip"
42 )]
43 pub retained: MissingOrNull<bool>,
44
45 /// If false, the server and clients will not store the value of the topic. This means that
46 /// only value updates will be available for the topic.
47 #[serde(
48 with = "missing_or_null_impls",
49 default,
50 skip_serializing_if = "should_skip"
51 )]
52 pub cached: MissingOrNull<bool>,
53}
54
55impl Default for Properties {
56 fn default() -> Self {
57 Self {
58 persistent: Default::default(),
59 retained: Default::default(),
60 cached: Default::default(),
61 }
62 }
63}
64
65impl Properties {
66 pub fn update(&mut self, other: Properties) {
67 self.persistent.update(other.persistent);
68 self.retained.update(other.retained);
69 self.cached.update(other.cached);
70 }
71}
72
73mod missing_or_null_impls {
74 use serde::{Deserialize, Deserializer, Serialize, Serializer};
75
76 use super::MissingOrNull;
77
78 pub fn serialize<S: Serializer>(
79 value: &MissingOrNull<bool>,
80 serializer: S,
81 ) -> Result<S::Ok, S::Error> {
82 <Option<bool>>::from(value.to_owned()).serialize(serializer)
83 }
84
85 pub fn deserialize<'de, D: Deserializer<'de>>(
86 deserializer: D,
87 ) -> Result<MissingOrNull<bool>, D::Error> {
88 <Option<bool>>::deserialize(deserializer).map(|option| option.into())
89 }
90}
91
92/// Each subscription may have options set. The following options have a defined meaning in this
93/// spec. Servers shall preserve arbitrary options, as servers and clients may support arbitrary
94/// options outside of this set. Options are set using Subscribe Message ([TextMessage::Subscribe])
95/// and cannot be changed.
96#[derive(Serialize, Deserialize, Debug)]
97pub struct SubscriptionOptions {
98 /// How frequently the server should send changes. The server may send more frequently than
99 /// this (e.g. use a combined minimum period for all values) or apply a restricted range to
100 /// this value. The default if unspecified is 100 ms (same as NT 3.0).
101 #[serde(skip_serializing_if = "skip_none", default)]
102 pub periodic: Option<u32>,
103
104 /// If true, the server should send all value changes over the wire. If false, only the most
105 /// recent value is sent (same as NT 3.0 behavior). If not specified, defaults to false.
106 #[serde(skip_serializing_if = "skip_none", default)]
107 pub all: Option<bool>,
108
109 /// If true, the server should not send any value changes over the wire regardless of other
110 /// options. This is useful for only getting topic announcements. If false, value changes are
111 /// sent in accordance with other options. If not specified, defaults to false.
112 #[serde(skip_serializing_if = "skip_none", default)]
113 pub topicsonly: Option<bool>,
114
115 /// If true, any topic starting with the name in the subscription topics list is subscribed to,
116 /// not just exact matches. If not specified, defaults to false.
117 #[serde(skip_serializing_if = "skip_none", default)]
118 pub prefix: Option<bool>,
119}
120
121impl Default for SubscriptionOptions {
122 fn default() -> Self {
123 Self {
124 periodic: None,
125 all: None,
126 topicsonly: None,
127 prefix: None,
128 }
129 }
130}
131
132#[derive(Serialize, Deserialize, Debug)]
133#[serde(tag = "method", content = "params")]
134pub enum TextMessage {
135 /// Sent from a client to the server to indicate the client wants to start publishing values at
136 /// the given topic. The server shall respond with a Topic Announcement Message
137 /// ([TextMessage::Announce]), even if the topic was previously announced. The client can start
138 /// publishing data values via MessagePack messages immediately after sending this message, but
139 /// the messages will be ignored by the server if the publisher data type does not match the
140 /// topic data type.
141 #[serde(rename = "publish")]
142 Publish {
143 /// The topic name being published
144 name: String,
145
146 /// A client-generated unique identifier for this publisher. Use the same UID later to
147 /// unpublish. This is also the identifier that the client will use in MessagePack messages
148 /// for this topic.
149 pubuid: u32,
150
151 /// The requested data type (as a string).
152 ///
153 /// If the topic is newly created (e.g. there are no other publishers) this sets the value
154 /// type. If the topic was previously published, this is ignored. The
155 /// [TextMessage::Announce] message contains the actual topic value type that the client
156 /// shall use when publishing values.
157 ///
158 /// Implementations should indicate an error if the user tries to publish an incompatible
159 /// type to that already set for the topic.
160 #[serde(rename = "type")]
161 data_type: String, // TODO: Make real type
162
163 /// Initial topic properties.
164 ///
165 /// If the topic is newly created (e.g. there are no other publishers) this sets the topic
166 /// properties. If the topic was previously published, this is ignored. The
167 /// [TextMessage::Announce] message contains the actual topic properties. Clients can use
168 /// the [TextMessage::SetProperties] message to change properties after topic creation.
169 properties: Properties,
170 },
171
172 /// Sent from a client to the server to indicate the client wants to stop publishing values for
173 /// the given topic and publisher. The client should stop publishing data value updates via
174 /// binary MessagePack messages for this publisher prior to sending this message.
175 ///
176 /// When there are no remaining publishers for a non-persistent topic, the server shall delete
177 /// the topic and send a Topic Removed Message ([TextMessage::Unannounce]) to all clients who
178 /// have been sent a previous Topic Announcement Message ([TextMessage::Announce]) for the
179 /// topic.
180 #[serde(rename = "unpublish")]
181 Unpublish {
182 /// The same unique identifier passed to the [TextMessage::Publish] message
183 pubuid: u32,
184 },
185
186 /// Sent from a client to the server to change properties (see [Properties]) for a given topic.
187 /// The server will send a corresponding Properties Update Message ([TextMessage::Properties])
188 /// to all subscribers to the topic (if the topic is published). This message shall be ignored
189 /// by the server if the topic is not published.
190 #[serde(rename = "setproperties")]
191 SetProperties { name: String, update: Properties },
192
193 /// Sent from a client to the server to indicate the client wants to subscribe to value changes
194 /// for the specified topics / groups of topics. The server shall send MessagePack messages
195 /// containing the current values for any existing cached topics upon receipt, and continue
196 /// sending MessagePack messages for future value changes. If a topic does not yet exist, no
197 /// message is sent until it is created (via a publish), at which point a Topic Announcement
198 /// Message ([TextMessage::Announce]) will be sent and MessagePack messages will automatically
199 /// follow as they are published.
200 ///
201 /// Subscriptions may overlap; only one MessagePack message is sent per value change regardless
202 /// of the number of subscriptions. Sending a subscribe message with the same subscription UID
203 /// as a previous subscribe message results in updating the subscription (replacing the array
204 /// of identifiers and updating any specified options).
205 #[serde(rename = "subscribe")]
206 Subscribe {
207 /// One or more topic names or prefixes (if the prefix option is true) to start receiving
208 /// messages for.
209 topics: Vec<String>,
210
211 /// A client-generated unique identifier for this subscription. Use the same UID later to
212 /// unsubscribe.
213 subuid: u32,
214
215 /// [SubscriptionOptions]
216 options: SubscriptionOptions,
217 },
218
219 /// Sent from a client to the server to indicate the client wants to stop subscribing to
220 /// messages for the given subscription.
221 #[serde(rename = "unsubscribe")]
222 Unsubscribe {
223 /// The same unique identifier passed to the [TextMessage::Subscribe] message
224 subuid: u32,
225 },
226
227 /// The server shall send this message for each of the following conditions:
228 /// - To all clients subscribed to a matching prefix when a topic is created
229 /// - To a client in response to an Publish Request Message ([TextMessage::Publish]) from that client
230 #[serde(rename = "announce")]
231 Announce {
232 name: String,
233
234 /// The identifier that the server will use in MessagePack messages for this topic
235 id: u32,
236
237 /// The data type for the topic (as a string)
238 #[serde(rename = "type")]
239 data_type: String,
240
241 /// If this message was sent in response to a [TextMessage::Publish] message, the Publisher UID provided
242 /// in that message. Otherwise absent.
243 pubuid: Option<u32>,
244
245 /// Topic [Properties]
246 properties: Properties,
247 },
248
249 /// The server shall send this message when a previously announced (via a Topic Announcement
250 /// Message ([TextMessage::Announce])) topic is deleted.
251 #[serde(rename = "unannounce")]
252 Unannounce {
253 name: String,
254
255 /// The identifier that the server was using for value updates
256 id: u32,
257 },
258
259 /// The server shall send this message when a previously announced (via a Topic Announcement
260 /// Message ([TextMessage::Announce])) topic has its properties changed (via Set Properties Message
261 /// ([TextMessage::SetProperties])).
262 #[serde(rename = "properties")]
263 Properties {
264 name: String,
265
266 /// True if this message is in response to a [TextMessage::SetProperties] message from the
267 /// same client. Otherwise absent.
268 ack: bool,
269
270 /// The client shall handle the update value as follows. If a property is not included in
271 /// the update map, its value is not changed. If a property is provided in the update map
272 /// with a value of null, the property is deleted.
273 update: Properties,
274 },
275}
276
277#[derive(PartialEq, Clone, Debug)]
278pub enum MissingOrNull<T> {
279 Missing,
280 Null,
281 Value(T),
282}
283
284impl<T: Copy> Copy for MissingOrNull<T> {}
285
286impl<T> From<Option<T>> for MissingOrNull<T> {
287 fn from(value: Option<T>) -> Self {
288 match value {
289 Some(val) => MissingOrNull::Value(val),
290 None => MissingOrNull::Null,
291 }
292 }
293}
294
295impl<T> From<MissingOrNull<T>> for Option<T> {
296 fn from(value: MissingOrNull<T>) -> Option<T> {
297 match value {
298 MissingOrNull::Missing | MissingOrNull::Null => None,
299 MissingOrNull::Value(val) => Some(val),
300 }
301 }
302}
303
304impl<T> Default for MissingOrNull<T> {
305 fn default() -> Self {
306 Self::Missing
307 }
308}
309
310impl<T> MissingOrNull<T> {
311 pub fn update(&mut self, other: Self) {
312 if matches!(other, MissingOrNull::Missing) {
313 return;
314 }
315
316 *self = other;
317 }
318}
319
320/// A single binary message that could be sent in a binary websocket frame
321#[derive(Debug)]
322pub struct BinaryMessage {
323 pub id: i64,
324 pub timestamp: u64,
325 pub data: BinaryData,
326}
327
328impl BinaryMessage {
329 /// Decode one entire message
330 pub fn from_reader<R: std::io::Read>(reader: &mut R) -> Result<Self, BinaryMessageError> {
331 let len = decode::read_array_len(reader)?;
332
333 if len != 4 {
334 Err(BinaryMessageError::MessageLen(len))
335 } else {
336 Ok(Self {
337 id: decode::read_int(reader)?,
338 timestamp: decode::read_int(reader)?,
339 data: BinaryData::from_reader(reader)?,
340 })
341 }
342 }
343
344 /// Enocde this message onto a writer
345 pub fn to_writer<W: std::io::Write>(&self, writer: &mut W) -> Result<(), BinaryMessageError> {
346 encode::write_array_len(writer, 4)?;
347 encode::write_sint(writer, self.id)?;
348 encode::write_uint(writer, self.timestamp)?;
349 self.data.to_writer(writer)?;
350 Ok(())
351 }
352}
353
354/// All defined types that could be sent in binary frames
355#[derive(Debug, Clone)]
356pub enum BinaryData {
357 Boolean(bool),
358 Double(f64),
359 Int(i64),
360 Float(f32),
361 Str(String),
362 Bin(Vec<u8>),
363 BoolArray(Vec<bool>),
364 DoubleArray(Vec<f64>),
365 IntArray(Vec<i64>),
366 FloatArray(Vec<f32>),
367 StringArray(Vec<String>),
368}
369
370#[derive(Debug, Error)]
371pub enum BinaryMessageError {
372 #[error("Could not parse number: {0}")]
373 IntError(#[from] NumValueReadError<std::io::Error>),
374 #[error("Could not read value: {0}")]
375 ValueReadError(#[from] ValueReadError<std::io::Error>),
376 #[error("Could not write value: {0}")]
377 ValueWriteError(#[from] ValueWriteError<std::io::Error>),
378 #[error("Unknown data type: {0}")]
379 UnknownDataType(u8),
380 #[error("Could not parse utf8 while parsing a string: {0}")]
381 InvalidUTF8(#[from] FromUtf8Error),
382 #[error("Encountered an error when reading more data: {0}")]
383 IoError(#[from] std::io::Error),
384 #[error("Incorrect binary message length, expected 4, found {0}")]
385 MessageLen(u32),
386}
387
388impl BinaryData {
389 /// Decode a single chunk of binary data from a reader
390 pub fn from_reader<R: std::io::Read>(reader: &mut R) -> Result<Self, BinaryMessageError> {
391 let data_type: u8 = decode::read_int(reader)?;
392
393 let data = match data_type {
394 0 => BinaryData::Boolean(decode::read_bool(reader)?),
395 1 => BinaryData::Double(decode::read_f64(reader)?),
396 2 => BinaryData::Int(decode::read_int(reader)?),
397 3 => BinaryData::Float(decode::read_f32(reader)?),
398 4 => {
399 let len = decode::read_str_len(reader)?;
400 let mut data = vec![0; len as usize];
401 reader.read_exact(&mut data)?;
402
403 BinaryData::Str(String::from_utf8(data)?)
404 }
405 5 => {
406 let len = decode::read_bin_len(reader)?;
407 let mut data = vec![0; len as usize];
408 reader.read_exact(&mut data)?;
409
410 BinaryData::Bin(data)
411 }
412 16 => {
413 let len = decode::read_array_len(reader)?;
414
415 BinaryData::BoolArray(
416 (0..len)
417 .map(|_| decode::read_bool(reader))
418 .collect::<Result<_, _>>()?,
419 )
420 }
421 17 => {
422 let len = decode::read_array_len(reader)?;
423
424 BinaryData::DoubleArray(
425 (0..len)
426 .map(|_| decode::read_f64(reader))
427 .collect::<Result<_, _>>()?,
428 )
429 }
430 18 => {
431 let len = decode::read_array_len(reader)?;
432
433 BinaryData::IntArray(
434 (0..len)
435 .map(|_| decode::read_int(reader))
436 .collect::<Result<_, _>>()?,
437 )
438 }
439 19 => {
440 let len = decode::read_array_len(reader)?;
441
442 BinaryData::FloatArray(
443 (0..len)
444 .map(|_| decode::read_f32(reader))
445 .collect::<Result<_, _>>()?,
446 )
447 }
448 20 => {
449 let len = decode::read_array_len(reader)?;
450
451 BinaryData::StringArray(
452 (0..len)
453 .map(|_| -> Result<String, BinaryMessageError> {
454 let len = decode::read_str_len(reader)?;
455 let mut data = vec![0; len as usize];
456 reader.read_exact(&mut data)?;
457
458 Ok(String::from_utf8(data)?)
459 })
460 .collect::<Result<_, _>>()?,
461 )
462 }
463 n => return Err(BinaryMessageError::UnknownDataType(n)),
464 };
465
466 Ok(data)
467 }
468
469 /// Encode this binary payload to the wire
470 pub fn to_writer<W: std::io::Write>(&self, writer: &mut W) -> Result<(), BinaryMessageError> {
471 match self {
472 BinaryData::Boolean(val) => {
473 encode::write_uint(writer, 0)?;
474 encode::write_bool(writer, *val)?;
475 }
476 BinaryData::Double(val) => {
477 encode::write_uint(writer, 1)?;
478 encode::write_f64(writer, *val)?;
479 }
480 BinaryData::Int(val) => {
481 encode::write_uint(writer, 2)?;
482 encode::write_sint(writer, *val)?;
483 }
484 BinaryData::Float(val) => {
485 encode::write_uint(writer, 3)?;
486 encode::write_f32(writer, *val)?;
487 }
488 BinaryData::Str(val) => {
489 encode::write_uint(writer, 4)?;
490 encode::write_str(writer, &val)?;
491 }
492 BinaryData::Bin(val) => {
493 encode::write_uint(writer, 5)?;
494 encode::write_bin(writer, &val)?;
495 }
496 BinaryData::BoolArray(val) => {
497 encode::write_uint(writer, 16)?;
498 encode::write_array_len(writer, val.len() as u32)?;
499 for val in val {
500 encode::write_bool(writer, *val)?;
501 }
502 }
503 BinaryData::DoubleArray(val) => {
504 encode::write_uint(writer, 17)?;
505 encode::write_array_len(writer, val.len() as u32)?;
506 for val in val {
507 encode::write_f64(writer, *val)?;
508 }
509 }
510 BinaryData::IntArray(val) => {
511 encode::write_uint(writer, 18)?;
512 encode::write_array_len(writer, val.len() as u32)?;
513 for val in val {
514 encode::write_sint(writer, *val)?;
515 }
516 }
517 BinaryData::FloatArray(val) => {
518 encode::write_uint(writer, 19)?;
519 encode::write_array_len(writer, val.len() as u32)?;
520 for val in val {
521 encode::write_f32(writer, *val)?;
522 }
523 }
524 BinaryData::StringArray(val) => {
525 encode::write_uint(writer, 20)?;
526 encode::write_array_len(writer, val.len() as u32)?;
527 for val in val {
528 encode::write_str(writer, &val)?;
529 }
530 }
531 };
532
533 Ok(())
534 }
535}