nt_rs/
types.rs

1use std::string::FromUtf8Error;
2
3use rmp::{
4    decode::{self, NumValueReadError, ValueReadError},
5    encode::{self, ValueWriteError},
6};
7use serde::{Deserialize, Serialize};
8use thiserror::Error;
9
10fn should_skip(val: &MissingOrNull<bool>) -> bool {
11    *val == MissingOrNull::Missing
12}
13
14fn skip_none<T>(val: &Option<T>) -> bool {
15    val.is_none()
16}
17
18/// Each published topic may also have properties associated to it. Properties are represented in
19/// the protocol as JSON and thus property values may be any JSON type. Property keys must be
20/// strings. The following properties have a defined meaning in this spec. Servers shall support
21/// arbitrary properties being set outside of this set. Clients shall ignore properties they do not
22/// recognize. Properties are initially set on publish and may be changed (by any client) using
23/// [TextMessage::SetProperties]
24#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
25pub struct Properties {
26    /// If true, the last set value will be periodically saved to persistent storage on the server
27    /// and be restored during server startup. Topics with this property set to true will not be
28    /// deleted by the server when the last publisher stops publishing.
29    #[serde(
30        with = "missing_or_null_impls",
31        default,
32        skip_serializing_if = "should_skip"
33    )]
34    pub persistent: MissingOrNull<bool>,
35
36    /// Topics with this property set to true will not be deleted by the server when the last
37    /// publisher stops publishing.
38    #[serde(
39        with = "missing_or_null_impls",
40        default,
41        skip_serializing_if = "should_skip"
42    )]
43    pub retained: MissingOrNull<bool>,
44
45    /// If false, the server and clients will not store the value of the topic. This means that
46    /// only value updates will be available for the topic.
47    #[serde(
48        with = "missing_or_null_impls",
49        default,
50        skip_serializing_if = "should_skip"
51    )]
52    pub cached: MissingOrNull<bool>,
53}
54
55impl Default for Properties {
56    fn default() -> Self {
57        Self {
58            persistent: Default::default(),
59            retained: Default::default(),
60            cached: Default::default(),
61        }
62    }
63}
64
65impl Properties {
66    pub fn update(&mut self, other: Properties) {
67        self.persistent.update(other.persistent);
68        self.retained.update(other.retained);
69        self.cached.update(other.cached);
70    }
71}
72
73mod missing_or_null_impls {
74    use serde::{Deserialize, Deserializer, Serialize, Serializer};
75
76    use super::MissingOrNull;
77
78    pub fn serialize<S: Serializer>(
79        value: &MissingOrNull<bool>,
80        serializer: S,
81    ) -> Result<S::Ok, S::Error> {
82        <Option<bool>>::from(value.to_owned()).serialize(serializer)
83    }
84
85    pub fn deserialize<'de, D: Deserializer<'de>>(
86        deserializer: D,
87    ) -> Result<MissingOrNull<bool>, D::Error> {
88        <Option<bool>>::deserialize(deserializer).map(|option| option.into())
89    }
90}
91
92/// Each subscription may have options set. The following options have a defined meaning in this
93/// spec. Servers shall preserve arbitrary options, as servers and clients may support arbitrary
94/// options outside of this set. Options are set using Subscribe Message ([TextMessage::Subscribe])
95/// and cannot be changed.
96#[derive(Serialize, Deserialize, Debug)]
97pub struct SubscriptionOptions {
98    /// How frequently the server should send changes. The server may send more frequently than
99    /// this (e.g. use a combined minimum period for all values) or apply a restricted range to
100    /// this value. The default if unspecified is 100 ms (same as NT 3.0).
101    #[serde(skip_serializing_if = "skip_none", default)]
102    pub periodic: Option<u32>,
103
104    /// If true, the server should send all value changes over the wire. If false, only the most
105    /// recent value is sent (same as NT 3.0 behavior). If not specified, defaults to false.
106    #[serde(skip_serializing_if = "skip_none", default)]
107    pub all: Option<bool>,
108
109    /// If true, the server should not send any value changes over the wire regardless of other
110    /// options. This is useful for only getting topic announcements. If false, value changes are
111    /// sent in accordance with other options. If not specified, defaults to false.
112    #[serde(skip_serializing_if = "skip_none", default)]
113    pub topicsonly: Option<bool>,
114
115    /// If true, any topic starting with the name in the subscription topics list is subscribed to,
116    /// not just exact matches. If not specified, defaults to false.
117    #[serde(skip_serializing_if = "skip_none", default)]
118    pub prefix: Option<bool>,
119}
120
121impl Default for SubscriptionOptions {
122    fn default() -> Self {
123        Self {
124            periodic: None,
125            all: None,
126            topicsonly: None,
127            prefix: None,
128        }
129    }
130}
131
132#[derive(Serialize, Deserialize, Debug)]
133#[serde(tag = "method", content = "params")]
134pub enum TextMessage {
135    /// Sent from a client to the server to indicate the client wants to start publishing values at
136    /// the given topic. The server shall respond with a Topic Announcement Message
137    /// ([TextMessage::Announce]), even if the topic was previously announced. The client can start
138    /// publishing data values via MessagePack messages immediately after sending this message, but
139    /// the messages will be ignored by the server if the publisher data type does not match the
140    /// topic data type.
141    #[serde(rename = "publish")]
142    Publish {
143        /// The topic name being published
144        name: String,
145
146        /// A client-generated unique identifier for this publisher. Use the same UID later to
147        /// unpublish. This is also the identifier that the client will use in MessagePack messages
148        /// for this topic.
149        pubuid: u32,
150
151        /// The requested data type (as a string).
152        ///
153        /// If the topic is newly created (e.g. there are no other publishers) this sets the value
154        /// type. If the topic was previously published, this is ignored. The
155        /// [TextMessage::Announce] message contains the actual topic value type that the client
156        /// shall use when publishing values.
157        ///
158        /// Implementations should indicate an error if the user tries to publish an incompatible
159        /// type to that already set for the topic.
160        #[serde(rename = "type")]
161        data_type: String, // TODO: Make real type
162
163        /// Initial topic properties.
164        ///
165        /// If the topic is newly created (e.g. there are no other publishers) this sets the topic
166        /// properties. If the topic was previously published, this is ignored. The
167        /// [TextMessage::Announce] message contains the actual topic properties. Clients can use
168        /// the [TextMessage::SetProperties] message to change properties after topic creation.
169        properties: Properties,
170    },
171
172    /// Sent from a client to the server to indicate the client wants to stop publishing values for
173    /// the given topic and publisher. The client should stop publishing data value updates via
174    /// binary MessagePack messages for this publisher prior to sending this message.
175    ///
176    /// When there are no remaining publishers for a non-persistent topic, the server shall delete
177    /// the topic and send a Topic Removed Message ([TextMessage::Unannounce]) to all clients who
178    /// have been sent a previous Topic Announcement Message ([TextMessage::Announce]) for the
179    /// topic.
180    #[serde(rename = "unpublish")]
181    Unpublish {
182        /// The same unique identifier passed to the [TextMessage::Publish] message
183        pubuid: u32,
184    },
185
186    /// Sent from a client to the server to change properties (see [Properties]) for a given topic.
187    /// The server will send a corresponding Properties Update Message ([TextMessage::Properties])
188    /// to all subscribers to the topic (if the topic is published). This message shall be ignored
189    /// by the server if the topic is not published.
190    #[serde(rename = "setproperties")]
191    SetProperties { name: String, update: Properties },
192
193    /// Sent from a client to the server to indicate the client wants to subscribe to value changes
194    /// for the specified topics / groups of topics. The server shall send MessagePack messages
195    /// containing the current values for any existing cached topics upon receipt, and continue
196    /// sending MessagePack messages for future value changes. If a topic does not yet exist, no
197    /// message is sent until it is created (via a publish), at which point a Topic Announcement
198    /// Message ([TextMessage::Announce]) will be sent and MessagePack messages will automatically
199    /// follow as they are published.
200    ///
201    /// Subscriptions may overlap; only one MessagePack message is sent per value change regardless
202    /// of the number of subscriptions. Sending a subscribe message with the same subscription UID
203    /// as a previous subscribe message results in updating the subscription (replacing the array
204    /// of identifiers and updating any specified options).
205    #[serde(rename = "subscribe")]
206    Subscribe {
207        /// One or more topic names or prefixes (if the prefix option is true) to start receiving
208        /// messages for.
209        topics: Vec<String>,
210
211        /// A client-generated unique identifier for this subscription. Use the same UID later to
212        /// unsubscribe.
213        subuid: u32,
214
215        /// [SubscriptionOptions]
216        options: SubscriptionOptions,
217    },
218
219    /// Sent from a client to the server to indicate the client wants to stop subscribing to
220    /// messages for the given subscription.
221    #[serde(rename = "unsubscribe")]
222    Unsubscribe {
223        /// The same unique identifier passed to the [TextMessage::Subscribe] message
224        subuid: u32,
225    },
226
227    /// The server shall send this message for each of the following conditions:
228    /// - To all clients subscribed to a matching prefix when a topic is created
229    /// - To a client in response to an Publish Request Message ([TextMessage::Publish]) from that client
230    #[serde(rename = "announce")]
231    Announce {
232        name: String,
233
234        /// The identifier that the server will use in MessagePack messages for this topic
235        id: u32,
236
237        /// The data type for the topic (as a string)
238        #[serde(rename = "type")]
239        data_type: String,
240
241        /// If this message was sent in response to a [TextMessage::Publish] message, the Publisher UID provided
242        /// in that message. Otherwise absent.
243        pubuid: Option<u32>,
244
245        /// Topic [Properties]
246        properties: Properties,
247    },
248
249    /// The server shall send this message when a previously announced (via a Topic Announcement
250    /// Message ([TextMessage::Announce])) topic is deleted.
251    #[serde(rename = "unannounce")]
252    Unannounce {
253        name: String,
254
255        /// The identifier that the server was using for value updates
256        id: u32,
257    },
258
259    /// The server shall send this message when a previously announced (via a Topic Announcement
260    /// Message ([TextMessage::Announce])) topic has its properties changed (via Set Properties Message
261    /// ([TextMessage::SetProperties])).
262    #[serde(rename = "properties")]
263    Properties {
264        name: String,
265
266        /// True if this message is in response to a [TextMessage::SetProperties] message from the
267        /// same client. Otherwise absent.
268        ack: bool,
269
270        /// The client shall handle the update value as follows. If a property is not included in
271        /// the update map, its value is not changed. If a property is provided in the update map
272        /// with a value of null, the property is deleted.
273        update: Properties,
274    },
275}
276
277#[derive(PartialEq, Clone, Debug)]
278pub enum MissingOrNull<T> {
279    Missing,
280    Null,
281    Value(T),
282}
283
284impl<T: Copy> Copy for MissingOrNull<T> {}
285
286impl<T> From<Option<T>> for MissingOrNull<T> {
287    fn from(value: Option<T>) -> Self {
288        match value {
289            Some(val) => MissingOrNull::Value(val),
290            None => MissingOrNull::Null,
291        }
292    }
293}
294
295impl<T> From<MissingOrNull<T>> for Option<T> {
296    fn from(value: MissingOrNull<T>) -> Option<T> {
297        match value {
298            MissingOrNull::Missing | MissingOrNull::Null => None,
299            MissingOrNull::Value(val) => Some(val),
300        }
301    }
302}
303
304impl<T> Default for MissingOrNull<T> {
305    fn default() -> Self {
306        Self::Missing
307    }
308}
309
310impl<T> MissingOrNull<T> {
311    pub fn update(&mut self, other: Self) {
312        if matches!(other, MissingOrNull::Missing) {
313            return;
314        }
315
316        *self = other;
317    }
318}
319
320/// A single binary message that could be sent in a binary websocket frame
321#[derive(Debug)]
322pub struct BinaryMessage {
323    pub id: i64,
324    pub timestamp: u64,
325    pub data: BinaryData,
326}
327
328impl BinaryMessage {
329    /// Decode one entire message
330    pub fn from_reader<R: std::io::Read>(reader: &mut R) -> Result<Self, BinaryMessageError> {
331        let len = decode::read_array_len(reader)?;
332
333        if len != 4 {
334            Err(BinaryMessageError::MessageLen(len))
335        } else {
336            Ok(Self {
337                id: decode::read_int(reader)?,
338                timestamp: decode::read_int(reader)?,
339                data: BinaryData::from_reader(reader)?,
340            })
341        }
342    }
343
344    /// Enocde this message onto a writer
345    pub fn to_writer<W: std::io::Write>(&self, writer: &mut W) -> Result<(), BinaryMessageError> {
346        encode::write_array_len(writer, 4)?;
347        encode::write_sint(writer, self.id)?;
348        encode::write_uint(writer, self.timestamp)?;
349        self.data.to_writer(writer)?;
350        Ok(())
351    }
352}
353
354/// All defined types that could be sent in binary frames
355#[derive(Debug, Clone)]
356pub enum BinaryData {
357    Boolean(bool),
358    Double(f64),
359    Int(i64),
360    Float(f32),
361    Str(String),
362    Bin(Vec<u8>),
363    BoolArray(Vec<bool>),
364    DoubleArray(Vec<f64>),
365    IntArray(Vec<i64>),
366    FloatArray(Vec<f32>),
367    StringArray(Vec<String>),
368}
369
370#[derive(Debug, Error)]
371pub enum BinaryMessageError {
372    #[error("Could not parse number: {0}")]
373    IntError(#[from] NumValueReadError<std::io::Error>),
374    #[error("Could not read value: {0}")]
375    ValueReadError(#[from] ValueReadError<std::io::Error>),
376    #[error("Could not write value: {0}")]
377    ValueWriteError(#[from] ValueWriteError<std::io::Error>),
378    #[error("Unknown data type: {0}")]
379    UnknownDataType(u8),
380    #[error("Could not parse utf8 while parsing a string: {0}")]
381    InvalidUTF8(#[from] FromUtf8Error),
382    #[error("Encountered an error when reading more data: {0}")]
383    IoError(#[from] std::io::Error),
384    #[error("Incorrect binary message length, expected 4, found {0}")]
385    MessageLen(u32),
386}
387
388impl BinaryData {
389    /// Decode a single chunk of binary data from a reader
390    pub fn from_reader<R: std::io::Read>(reader: &mut R) -> Result<Self, BinaryMessageError> {
391        let data_type: u8 = decode::read_int(reader)?;
392
393        let data = match data_type {
394            0 => BinaryData::Boolean(decode::read_bool(reader)?),
395            1 => BinaryData::Double(decode::read_f64(reader)?),
396            2 => BinaryData::Int(decode::read_int(reader)?),
397            3 => BinaryData::Float(decode::read_f32(reader)?),
398            4 => {
399                let len = decode::read_str_len(reader)?;
400                let mut data = vec![0; len as usize];
401                reader.read_exact(&mut data)?;
402
403                BinaryData::Str(String::from_utf8(data)?)
404            }
405            5 => {
406                let len = decode::read_bin_len(reader)?;
407                let mut data = vec![0; len as usize];
408                reader.read_exact(&mut data)?;
409
410                BinaryData::Bin(data)
411            }
412            16 => {
413                let len = decode::read_array_len(reader)?;
414
415                BinaryData::BoolArray(
416                    (0..len)
417                        .map(|_| decode::read_bool(reader))
418                        .collect::<Result<_, _>>()?,
419                )
420            }
421            17 => {
422                let len = decode::read_array_len(reader)?;
423
424                BinaryData::DoubleArray(
425                    (0..len)
426                        .map(|_| decode::read_f64(reader))
427                        .collect::<Result<_, _>>()?,
428                )
429            }
430            18 => {
431                let len = decode::read_array_len(reader)?;
432
433                BinaryData::IntArray(
434                    (0..len)
435                        .map(|_| decode::read_int(reader))
436                        .collect::<Result<_, _>>()?,
437                )
438            }
439            19 => {
440                let len = decode::read_array_len(reader)?;
441
442                BinaryData::FloatArray(
443                    (0..len)
444                        .map(|_| decode::read_f32(reader))
445                        .collect::<Result<_, _>>()?,
446                )
447            }
448            20 => {
449                let len = decode::read_array_len(reader)?;
450
451                BinaryData::StringArray(
452                    (0..len)
453                        .map(|_| -> Result<String, BinaryMessageError> {
454                            let len = decode::read_str_len(reader)?;
455                            let mut data = vec![0; len as usize];
456                            reader.read_exact(&mut data)?;
457
458                            Ok(String::from_utf8(data)?)
459                        })
460                        .collect::<Result<_, _>>()?,
461                )
462            }
463            n => return Err(BinaryMessageError::UnknownDataType(n)),
464        };
465
466        Ok(data)
467    }
468
469    /// Encode this binary payload to the wire
470    pub fn to_writer<W: std::io::Write>(&self, writer: &mut W) -> Result<(), BinaryMessageError> {
471        match self {
472            BinaryData::Boolean(val) => {
473                encode::write_uint(writer, 0)?;
474                encode::write_bool(writer, *val)?;
475            }
476            BinaryData::Double(val) => {
477                encode::write_uint(writer, 1)?;
478                encode::write_f64(writer, *val)?;
479            }
480            BinaryData::Int(val) => {
481                encode::write_uint(writer, 2)?;
482                encode::write_sint(writer, *val)?;
483            }
484            BinaryData::Float(val) => {
485                encode::write_uint(writer, 3)?;
486                encode::write_f32(writer, *val)?;
487            }
488            BinaryData::Str(val) => {
489                encode::write_uint(writer, 4)?;
490                encode::write_str(writer, &val)?;
491            }
492            BinaryData::Bin(val) => {
493                encode::write_uint(writer, 5)?;
494                encode::write_bin(writer, &val)?;
495            }
496            BinaryData::BoolArray(val) => {
497                encode::write_uint(writer, 16)?;
498                encode::write_array_len(writer, val.len() as u32)?;
499                for val in val {
500                    encode::write_bool(writer, *val)?;
501                }
502            }
503            BinaryData::DoubleArray(val) => {
504                encode::write_uint(writer, 17)?;
505                encode::write_array_len(writer, val.len() as u32)?;
506                for val in val {
507                    encode::write_f64(writer, *val)?;
508                }
509            }
510            BinaryData::IntArray(val) => {
511                encode::write_uint(writer, 18)?;
512                encode::write_array_len(writer, val.len() as u32)?;
513                for val in val {
514                    encode::write_sint(writer, *val)?;
515                }
516            }
517            BinaryData::FloatArray(val) => {
518                encode::write_uint(writer, 19)?;
519                encode::write_array_len(writer, val.len() as u32)?;
520                for val in val {
521                    encode::write_f32(writer, *val)?;
522                }
523            }
524            BinaryData::StringArray(val) => {
525                encode::write_uint(writer, 20)?;
526                encode::write_array_len(writer, val.len() as u32)?;
527                for val in val {
528                    encode::write_str(writer, &val)?;
529                }
530            }
531        };
532
533        Ok(())
534    }
535}