nt_client/topic.rs
1//! Named data channels.
2//!
3//! Topics have a fixed data type and can be subscribed and published to.
4
5use std::{collections::{HashMap, VecDeque}, fmt::{Debug, Display}, time::Duration};
6
7use serde::{Deserialize, Serialize};
8
9use crate::{ClientHandle, data::{DataType, NetworkTableData}, error::ConnectionClosedError, net::{Announce, Unannounce}, publish::{GenericPublisher, NewPublisherError, Publisher}, subscribe::{Subscriber, SubscriptionOptions}};
10
11pub mod collection;
12
13/// Creates a [`TopicPath`] containing the segments.
14///
15/// `path!` allows for the easy creation of [`TopicPath`]s without having to deal with creating
16/// [`VecDeque`]s.
17/// Its usage is extremely similar to [`std::vec!`].
18///
19/// # Examples
20/// ```
21/// use std::collections::VecDeque;
22/// use nt_client::{topic::TopicPath, path};
23///
24/// let mut vec_deque = VecDeque::new();
25/// vec_deque.push_back("my".to_owned());
26/// vec_deque.push_back("path".to_owned());
27/// let path = TopicPath::new(vec_deque);
28///
29/// assert_eq!(path!["my", "path"], path);
30/// ```
31#[macro_export]
32macro_rules! path {
33 () => {
34 $crate::topic::TopicPath::default();
35 };
36
37 ($($segment: literal),+ $(,)?) => {{
38 let mut segments = std::collections::VecDeque::new();
39 $(
40 segments.push_back($segment.to_string());
41 )*
42 $crate::topic::TopicPath::new(segments)
43 }};
44}
45
46/// Represents a `NetworkTables` topic.
47///
48/// This differs from an [`AnnouncedTopic`], as that is a **server created topic**, while this is a
49/// **client created topic**.
50///
51/// # Examples
52/// ```no_run
53/// use nt_client::{Client, path};
54///
55/// # tokio_test::block_on(async {
56/// let client = Client::new(Default::default());
57///
58/// client.connect_setup(|client| {
59/// // get a topic named `/my/topic` using the path! macro
60/// let mut topic = client.topic(path!["my", "topic"]);
61/// tokio::spawn(async move {
62/// // subscribe to `/mytopic`
63/// // note that this immediately unsubscribes and doesn't actually do anything
64/// topic.subscribe(Default::default()).await;
65///
66/// // mutate the topic, changing its name to `/mytopic`
67/// // this is useful when you don't have access to a Client and need to dynamically
68/// // "create" topics
69/// //
70/// // to see a better example of this, look at the `generic_pub` example
71/// // the subscriber created previously, assuming it isn't immediately dropped,
72/// // will still be subscribing to `/mytopic`
73/// *topic.name_mut() = "/othertopic".to_owned();
74///
75/// // subscribe to `/othertopic`
76/// // note that this immediately unsubscribes and doesn't actually do anything
77/// topic.subscribe(Default::default()).await;
78/// });
79/// }).await.unwrap();
80/// # });
81/// ```
82#[derive(Debug, Clone)]
83pub struct Topic {
84 name: String,
85 handle: ClientHandle,
86}
87
88impl PartialEq for Topic {
89 fn eq(&self, other: &Self) -> bool {
90 self.name == other.name
91 }
92}
93
94impl Eq for Topic { }
95
96impl Topic {
97 pub(super) fn new(
98 name: String,
99 handle: ClientHandle
100 ) -> Self {
101 Self { name, handle }
102 }
103
104 /// Returns a reference to the name this topic has.
105 pub fn name(&self) -> &str {
106 &self.name
107 }
108
109 /// Returns a mutable reference to the name this topic has.
110 pub fn name_mut(&mut self) -> &mut String {
111 &mut self.name
112 }
113
114 /// Creates a child topic with a suffix.
115 ///
116 /// # Examples
117 /// ```no_run
118 /// use nt_client::Client;
119 ///
120 /// let client = Client::new(Default::default());
121 ///
122 /// let root_topic = client.topic("/SmartDashboard");
123 /// let number_topic = root_topic.child("/mynumber");
124 /// let boolean_topic = root_topic.child("/myboolean");
125 ///
126 /// assert_eq!(root_topic.name(), "/SmartDashboard");
127 /// assert_eq!(number_topic.name(), "/SmartDashboard/mynumber");
128 /// assert_eq!(boolean_topic.name(), "/SmartDashboard/myboolean");
129 /// ```
130 pub fn child(&self, name: impl AsRef<str>) -> Self {
131 Self::new(self.name.clone() + name.as_ref(), self.handle.clone())
132 }
133
134 /// Publishes to this topic with the data type `T`.
135 ///
136 /// For a generic-free version, see [`generic_publish`][`Self::generic_publish`].
137 ///
138 /// # Note
139 /// This method requires the [`Client`] websocket connection to already be made. Calling this
140 /// method wihout already connecting the [`Client`] will cause it to hang forever.
141 ///
142 /// # Errors
143 /// Returns an error if a publisher could not be made to the server.
144 ///
145 /// [`Client`]: crate::Client
146 pub async fn publish<T: NetworkTableData>(&self, properties: Properties) -> Result<Publisher<T>, NewPublisherError> {
147 Publisher::new(self.name.clone(), properties, self.handle.time(), self.handle.server_send.clone(), self.handle.client_send.subscribe()).await
148 }
149
150 /// Publishes to this topic with the data type `T`, not waiting for an nnounce message from the
151 /// server. This is meant as a workaround to [issue #7680].
152 ///
153 /// Using this method does not guarantees that the topic has a matching type, nor does it
154 /// guarantee that the publisher was even able to be made. Use at your own risk!
155 ///
156 /// # Note
157 /// This method requires the [`Client`] websocket connection to already be made. Calling this
158 /// method wihout already connecting the [`Client`] will cause it to hang forever.
159 ///
160 /// # Errors
161 /// Returns an error if the `NetworkTables` connection was closed.
162 ///
163 /// [`Client`]: crate::Client
164 /// [issue #7680]: https://github.com/wpilibsuite/allwpilib/issues/7680
165 #[cfg(feature = "publish_bypass")]
166 pub async fn publish_bypass<T: NetworkTableData>(&self, properties: Properties) -> Result<Publisher<T>, ConnectionClosedError> {
167 Publisher::new_bypass(self.name.clone(), properties, self.handle.time(), self.handle.server_send.clone(), self.handle.client_send.subscribe()).await
168 }
169
170 /// Publishes to this topic with some data type.
171 ///
172 /// This behaves differently from [`publish`][`Self::publish`], as that has a generic type and
173 /// guarantees through type-safety that the client is publishing values that have the same type
174 /// the server does for that topic. Extra care must be taken to ensure no type mismatches
175 /// occur.
176 ///
177 /// # Note
178 /// This method requires the [`Client`] websocket connection to already be made. Calling this
179 /// method wihout already connecting the [`Client`] will cause it to hang forever.
180 ///
181 /// # Errors
182 /// Returns an error if a publisher could not be made to the server.
183 ///
184 /// [`Client`]: crate::Client
185 pub async fn generic_publish(&self, r#type: DataType, properties: Properties) -> Result<GenericPublisher, NewPublisherError> {
186 GenericPublisher::new(self.name.clone(), properties, r#type, self.handle.time(), self.handle.server_send.clone(), self.handle.client_send.subscribe()).await
187 }
188
189 /// Publishes to this topic with some data type, not waiting for an nnounce message from the
190 /// server. This is meant as a workaround to [issue #7680].
191 ///
192 /// Using this method does not guarantees that the topic has a matching type, nor does it
193 /// guarantee that the publisher was even able to be made. Use at your own risk!
194 ///
195 /// This behaves differently from [`publish_bypass`][`Self::publish_bypass`], as that has a generic type and
196 /// guarantees through type-safety that the client is publishing values that have the same type
197 /// the server does for that topic. Extra care must be taken to ensure no type mismatches
198 /// occur.
199 ///
200 /// # Note
201 /// This method requires the [`Client`] websocket connection to already be made. Calling this
202 /// method wihout already connecting the [`Client`] will cause it to hang forever.
203 ///
204 /// # Errors
205 /// Returns an error if the `NetworkTables` connection was closed.
206 ///
207 /// [`Client`]: crate::Client
208 /// [issue #7680]: https://github.com/wpilibsuite/allwpilib/issues/7680
209 #[cfg(feature = "publish_bypass")]
210 pub async fn generic_publish_bypass(&self, r#type: DataType, properties: Properties) -> Result<GenericPublisher, ConnectionClosedError> {
211 GenericPublisher::new_bypass(self.name.clone(), properties, r#type, self.handle.time(), self.handle.server_send.clone(), self.handle.client_send.subscribe()).await
212 }
213
214 /// Subscribes to this topic.
215 ///
216 /// This method does not require the [`Client`] websocket connection to be made.
217 ///
218 /// [`Client`]: crate::Client
219 pub async fn subscribe(&self, options: SubscriptionOptions) -> Result<Subscriber, ConnectionClosedError> {
220 Subscriber::new(vec![self.name.clone()], options, self.handle.announced_topics.clone(), self.handle.server_send.clone(), self.handle.client_send.subscribe()).await
221 }
222}
223
224/// A topic that has been announced by the `NetworkTables` server.
225///
226/// Topics will only be announced when there is a subscriber subscribing to it.
227///
228/// This differs from a [`Topic`], as that is a **client created topic**, while this is a
229/// **server created topic**.
230#[derive(Debug, Clone, PartialEq)]
231pub struct AnnouncedTopic {
232 name: String,
233 id: i32,
234 r#type: DataType,
235 pub(crate) properties: Properties,
236 value: Option<rmpv::Value>,
237 last_updated: Option<Duration>,
238}
239
240impl AnnouncedTopic {
241 /// Returns the name of this topic.
242 pub fn name(&self) -> &str {
243 &self.name
244 }
245
246 /// Returns the id of this topic.
247 ///
248 /// This id is guaranteed to be unique.
249 pub fn id(&self) -> i32 {
250 self.id
251 }
252
253 /// Returns the data type of this topic.
254 pub fn r#type(&self) -> &DataType {
255 &self.r#type
256 }
257
258 /// Returns the properties of this topic.
259 pub fn properties(&self) -> &Properties {
260 &self.properties
261 }
262
263 /// Returns the current value of this topic.
264 ///
265 /// This value will not be present if the `cached` property is `false`, regardless of if this
266 /// topic has been published to.
267 pub fn value(&self) -> Option<&rmpv::Value> {
268 self.value.as_ref()
269 }
270
271 /// Returns when this topic was last updated as a duration of time since the server started.
272 ///
273 /// This value will not be present if it has never been published to.
274 pub fn last_updated(&self) -> Option<&Duration> {
275 self.last_updated.as_ref()
276 }
277
278 pub(crate) fn update(&mut self, when: Duration) {
279 self.last_updated = Some(when);
280 }
281
282 pub(crate) fn update_value(&mut self, value: rmpv::Value) {
283 self.value = Some(value);
284 }
285
286 /// Returns whether the given names and subscription options match this topic.
287 pub fn matches(&self, names: &[String], options: &SubscriptionOptions) -> bool {
288 names.iter()
289 .any(|name| &self.name == name || (options.prefix.is_some_and(|flag| flag) && self.name.starts_with(name)))
290 }
291}
292
293impl From<&Announce> for AnnouncedTopic {
294 fn from(value: &Announce) -> Self {
295 Self {
296 name: value.name.clone(),
297 id: value.id,
298 r#type: value.r#type.clone(),
299 properties: value.properties.clone(),
300 value: None,
301 last_updated: None,
302 }
303 }
304}
305
306/// Represents a list of all server-announced topics.
307#[derive(Default, Debug, Clone, PartialEq)]
308pub struct AnnouncedTopics {
309 topics: HashMap<i32, AnnouncedTopic>,
310 name_to_id: HashMap<String, i32>,
311}
312
313impl AnnouncedTopics {
314 /// Creates a new, empty list of announced topics.
315 pub fn new() -> Self {
316 Default::default()
317 }
318
319 pub(crate) fn insert(&mut self, announce: &Announce) {
320 self.topics.insert(announce.id, announce.into());
321 self.name_to_id.insert(announce.name.clone(), announce.id);
322 }
323
324 pub(crate) fn remove(&mut self, unannounce: &Unannounce) {
325 self.topics.remove(&unannounce.id);
326 self.name_to_id.remove(&unannounce.name);
327 }
328
329 /// Gets a topic from its id.
330 pub fn get_from_id(&self, id: i32) -> Option<&AnnouncedTopic> {
331 self.topics.get(&id)
332 }
333
334 /// Gets a mutable topic from its id.
335 pub fn get_mut_from_id(&mut self, id: i32) -> Option<&mut AnnouncedTopic> {
336 self.topics.get_mut(&id)
337 }
338
339 /// Gets a topic from its name.
340 pub fn get_from_name(&self, name: &str) -> Option<&AnnouncedTopic> {
341 self.name_to_id.get(name).and_then(|id| self.topics.get(id))
342 }
343
344 /// Gets a mutable topic from its name.
345 pub fn get_mut_from_name(&mut self, name: &str) -> Option<&mut AnnouncedTopic> {
346 self.name_to_id.get(name).and_then(|id| self.topics.get_mut(id))
347 }
348
349 /// Gets a topic id from its name.
350 pub fn get_id(&self, name: &str) -> Option<i32> {
351 self.name_to_id.get(name).copied()
352 }
353
354 /// An iterator visiting all id and topic values in arbitrary order.
355 pub fn id_values(&self) -> std::collections::hash_map::Values<'_, i32, AnnouncedTopic> {
356 self.topics.values()
357 }
358}
359
360/// Topic properties.
361///
362/// These are properties attached to all topics and are represented as JSON. To add extra
363/// properties, use the `extra` field.
364///
365/// Docs taken and summarized from [here](https://github.com/wpilibsuite/allwpilib/blob/main/ntcore/doc/networktables4.adoc#properties).
366// TODO: test if server recognizes non-bool properties
367#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq, Eq)]
368#[serde(rename_all = "lowercase")]
369pub struct Properties {
370 /// Persistent flag.
371 ///
372 /// If set to `true`, the server will save this value and it will be restored during server
373 /// startup. It will also not be deleted by the server if the last publisher stops publishing.
374 #[serde(skip_serializing_if = "Option::is_none")]
375 pub persistent: Option<bool>,
376 /// Retained flag.
377 ///
378 /// If set to `true`, the server will not delete this topic when the last publisher stops
379 /// publishing.
380 #[serde(skip_serializing_if = "Option::is_none")]
381 pub retained: Option<bool>,
382 /// Cached flag.
383 ///
384 /// If set to `false`, servers and clients will not store the value of this topic meaning only
385 /// values updates will be avaible for the topic.
386 #[serde(skip_serializing_if = "Option::is_none")]
387 pub cached: Option<bool>,
388
389 /// Extra property values.
390 ///
391 /// This should be used for generic properties not officially recognized by a `NetworkTables` server.
392 #[serde(flatten)]
393 pub extra: HashMap<String, serde_json::Value>,
394}
395
396/// Represents a slash (`/`) deliminated path.
397///
398/// This is especially useful when trying to parse nested data, such as from Shuffleboard
399/// (found in `/Shuffleboard/...`).
400///
401/// This can be thought of as a wrapper for a [`VecDeque`], only providing trait impls to convert
402/// to/from a [`String`].
403///
404/// # Note
405/// The [`Display`] impl will always contain a leading slash, but not a trailing one,
406/// regardless of if the path was parsed from a [`String`] containing either a leading or trailing
407/// slash.
408///
409/// # Warning
410/// In cases where slashes are present in segment names, turning to and from a [`String`] is
411/// **NOT** guaranteed to preserve segment names.
412///
413/// ```
414/// use nt_client::{topic::TopicPath, path};
415///
416/// let path = path!["///weird//", "na//mes//"];
417///
418/// assert_ne!(<String as Into<TopicPath>>::into(path.to_string()), path);
419/// ```
420///
421/// In the above example, `.to_string()` is converting the path to `////weird///na//mes//`.
422/// When turning this back into a `TopicPath`, it recognizes the following segments (with
423/// trailing and leading slashes removed):
424///
425/// **/** / **/weird** / **/** / **na** / **/mes** /
426///
427/// # Examples
428/// ```
429/// use nt_client::{topic::TopicPath, path};
430///
431/// // automatically inserts a leading slash
432/// assert_eq!(path!["my", "topic"].to_string(), "/my/topic");
433///
434/// // slashes in the segment names are preserved
435/// assert_eq!(path!["some", "/data"].to_string(), "/some//data");
436///
437/// assert_eq!(<&str as Into<TopicPath>>::into("/path/to/data"), path!["path", "to", "data"]);
438///
439/// assert_eq!(<&str as Into<TopicPath>>::into("//some///weird/path/"), path!["/some", "/", "weird", "path"]);
440/// ```
441/// Getting a topic:
442/// ```no_run
443/// use nt_client::{Client, path};
444///
445/// # tokio_test::block_on(async {
446/// let client = Client::new(Default::default());
447///
448/// let topic = client.topic(path!["nested", "data"]);
449///
450/// // do something with `topic`
451///
452/// client.connect().await;
453/// # });
454/// ```
455/// Parsing topic name:
456/// ```no_run
457/// use nt_client::{topic::TopicPath, subscribe::{ReceivedMessage, SubscriptionOptions}, Client};
458///
459/// # tokio_test::block_on(async {
460/// let client = Client::new(Default::default());
461///
462/// client.connect_setup(setup).await;
463/// # });
464///
465/// fn setup(client: &Client) {
466/// let sub_topic = client.topic("/Root/");
467/// tokio::spawn(async move {
468/// let mut sub = sub_topic.subscribe(SubscriptionOptions {
469/// topics_only: Some(true),
470/// prefix: Some(true),
471/// ..Default::default()
472/// }).await.unwrap();
473///
474/// while let Ok(ReceivedMessage::Announced(topic)) = sub.recv().await {
475/// let path: TopicPath = topic.name().into();
476///
477/// // do something with `path`
478/// }
479/// });
480/// }
481/// ```
482#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
483pub struct TopicPath {
484 /// The segments contained in the path.
485 pub segments: VecDeque<String>,
486}
487
488impl TopicPath {
489 /// The delimiter to use when converting from a [`String`].
490 pub const DELIMITER: char = '/';
491
492 /// Creates a new `TopicPath` with segments.
493 pub fn new(segments: VecDeque<String>) -> Self {
494 Self { segments }
495 }
496}
497
498impl From<VecDeque<String>> for TopicPath {
499 fn from(value: VecDeque<String>) -> Self {
500 Self { segments: value }
501 }
502}
503
504impl Display for TopicPath {
505 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
506 let full_path = self.segments.iter().fold(String::new(), |prev, curr| prev + "/" + curr);
507 f.write_str(&full_path)
508 }
509}
510
511impl From<&str> for TopicPath {
512 fn from(value: &str) -> Self {
513 value.to_string().into()
514 }
515}
516
517impl From<String> for TopicPath {
518 fn from(value: String) -> Self {
519 let str = value.strip_prefix(Self::DELIMITER).unwrap_or(&value);
520 let str = str
521 .strip_suffix(Self::DELIMITER)
522 .map(|str| str.to_owned())
523 .unwrap_or_else(|| str.to_owned());
524
525 str.chars().fold((VecDeque::<String>::new(), true), |(mut parts, prev_is_delimiter), char| {
526 if prev_is_delimiter {
527 parts.push_back(String::from(char));
528 (parts, false)
529 } else {
530 let is_delimiter = char == Self::DELIMITER;
531 if !is_delimiter { parts.back_mut().unwrap().push(char); };
532 (parts, is_delimiter)
533 }
534 }).0.into()
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 #[test]
543 fn test_single_item() {
544 assert_eq!(into_path("Topic"), path!["Topic"]);
545 assert_eq!(into_path("123thing"), path!["123thing"]);
546
547 assert_eq!(into_path("/mydata"), path!["mydata"]);
548 assert_eq!(into_path("value/"), path!["value"]);
549
550 assert_eq!(into_path("//thing"), path!["/thing"]);
551 assert_eq!(into_path("cooldata//"), path!["cooldata"]);
552 }
553
554 #[test]
555 fn test_multi_item() {
556 assert_eq!(into_path("some/thing"), path!["some", "thing"]);
557 assert_eq!(into_path("Topic/thing/value"), path!["Topic", "thing", "value"]);
558
559 assert_eq!(into_path("/hello/there"), path!["hello", "there"]);
560 assert_eq!(into_path("my/long/path/"), path!["my", "long", "path"]);
561
562 assert_eq!(into_path("//weird///path/and/slash//"), path!["/weird", "/", "path", "and", "slash"]);
563 assert_eq!(into_path("//////"), path!["/", "/"]);
564 }
565
566 #[test]
567 fn test_parse_to_string() {
568 let path = path!["simple"];
569 assert_eq!(into_path(&path.to_string()), path);
570
571 let path = path!["my", "data"];
572 assert_eq!(into_path(&path.to_string()), path);
573
574 let path = path!["/something", "really", "/weird"];
575 assert_eq!(into_path(&path.to_string()), path);
576 }
577
578 fn into_path(s: &str) -> TopicPath {
579 s.into()
580 }
581}
582