ros2_client/
pubsub.rs

1use std::{io, marker::PhantomData};
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4use futures::{
5  pin_mut,
6  stream::{FusedStream, StreamExt},
7  Future,
8};
9use rustdds::{
10  dds::{ReadError, ReadResult, WriteResult},
11  serialization::CdrDeserializeSeedDecoder,
12  *,
13};
14use serde::{de::DeserializeOwned, Serialize};
15
16use super::{gid::Gid, message_info::MessageInfo, node::Node};
17
18/// A ROS2 Publisher
19///
20/// Corresponds to a simplified [`DataWriter`](rustdds::no_key::DataWriter)in
21/// DDS
22pub struct Publisher<M: Serialize> {
23  datawriter: no_key::DataWriterCdr<M>,
24}
25
26impl<M: Serialize> Publisher<M> {
27  // These must be created from Node
28  pub(crate) fn new(datawriter: no_key::DataWriterCdr<M>) -> Publisher<M> {
29    Publisher { datawriter }
30  }
31
32  pub fn publish(&self, message: M) -> WriteResult<(), M> {
33    self.datawriter.write(message, Some(Timestamp::now()))
34  }
35
36  // pub(crate) fn publish_with_options(
37  //   &self,
38  //   message: M,
39  //   wo: WriteOptions,
40  // ) -> dds::Result<rustdds::rpc::SampleIdentity> {
41  //   self.datawriter.write_with_options(message, wo)
42  // }
43
44  pub fn assert_liveliness(&self) -> WriteResult<(), ()> {
45    self.datawriter.assert_liveliness()
46  }
47
48  pub fn guid(&self) -> rustdds::GUID {
49    self.datawriter.guid()
50  }
51
52  pub fn gid(&self) -> Gid {
53    self.guid().into()
54  }
55
56  /// Returns the count of currently matched subscribers.
57  ///
58  /// `my_node` must be the Node that created this Publisher, or the result is
59  /// undefined.
60  pub fn get_subscription_count(&self, my_node: &Node) -> usize {
61    my_node.get_subscription_count(self.guid())
62  }
63
64  /// Waits until there is at least one matched subscription on this topic,
65  /// possibly forever.
66  ///
67  /// `my_node` must be the Node that created this Subscription, or the length
68  /// of the wait is undefined.
69  pub fn wait_for_subscription(&self, my_node: &Node) -> impl Future<Output = ()> + Send {
70    my_node.wait_for_reader(self.guid())
71  }
72
73  pub async fn async_publish(&self, message: M) -> WriteResult<(), M> {
74    self
75      .datawriter
76      .async_write(message, Some(Timestamp::now()))
77      .await
78  }
79
80  #[allow(dead_code)] // This is for async Service implementation. Remove this when it is implemented.
81  pub(crate) async fn async_publish_with_options(
82    &self,
83    message: M,
84    wo: WriteOptions,
85  ) -> dds::WriteResult<rustdds::rpc::SampleIdentity, M> {
86    self.datawriter.async_write_with_options(message, wo).await
87  }
88}
89// ----------------------------------------------------
90// ----------------------------------------------------
91// ----------------------------------------------------
92// ----------------------------------------------------
93// ----------------------------------------------------
94
95/// A ROS2 Subscription
96///
97/// Corresponds to a (simplified) [`DataReader`](rustdds::no_key::DataReader) in
98/// DDS
99pub struct Subscription<M> {
100  datareader: no_key::SimpleDataReaderCdr<M>,
101}
102
103impl<M> Subscription<M>
104where
105  M: 'static,
106{
107  // These must be created from Node
108  pub(crate) fn new(datareader: no_key::SimpleDataReaderCdr<M>) -> Subscription<M> {
109    Subscription { datareader }
110  }
111
112  pub fn take_seed<'de, S>(&self, seed: S) -> ReadResult<Option<(M, MessageInfo)>>
113  where
114    S: serde::de::DeserializeSeed<'de, Value = M> + Clone,
115    M: 'static,
116  {
117    self.datareader.drain_read_notifications();
118    let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
119    let ds: Option<no_key::DeserializedCacheChange<M>> =
120      self.datareader.try_take_one_with(decoder)?;
121    Ok(ds.map(dcc_to_value_and_messageinfo))
122  }
123
124  // Returns an async Stream of messages with MessageInfo metadata
125  pub fn async_stream_seed<'a, 'de, S>(
126    &'a self,
127    seed: S,
128  ) -> impl FusedStream<Item = ReadResult<(M, MessageInfo)>> + 'a
129  where
130    S: serde::de::DeserializeSeed<'de, Value = M> + Clone + 'a,
131    M: 'static,
132  {
133    let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
134    self
135      .datareader
136      .as_async_stream_with(decoder)
137      .map(|result| result.map(dcc_to_value_and_messageinfo))
138  }
139}
140
141impl<M: 'static + DeserializeOwned> Subscription<M> {
142  pub fn take(&self) -> ReadResult<Option<(M, MessageInfo)>> {
143    self.datareader.drain_read_notifications();
144    let ds: Option<no_key::DeserializedCacheChange<M>> = self.datareader.try_take_one()?;
145    Ok(ds.map(dcc_to_value_and_messageinfo))
146  }
147
148  pub async fn async_take(&self) -> ReadResult<(M, MessageInfo)> {
149    let async_stream = self.datareader.as_async_stream();
150    pin_mut!(async_stream);
151    match async_stream.next().await {
152      Some(Err(e)) => Err(e),
153      Some(Ok(ds)) => Ok(dcc_to_value_and_messageinfo(ds)),
154      // Stream from SimpleDataReader is not supposed to ever end.
155      None => {
156        read_error_internal!("async_take(): SimpleDataReader value stream unexpectedly ended!")
157      }
158    }
159  }
160
161  // Returns an async Stream of messages with MessageInfo metadata
162  pub fn async_stream(&self) -> impl FusedStream<Item = ReadResult<(M, MessageInfo)>> + '_ {
163    self
164      .datareader
165      .as_async_stream()
166      .map(|result| result.map(dcc_to_value_and_messageinfo))
167  }
168}
169
170impl<M> Subscription<M>
171where
172  M: 'static,
173{
174  pub fn guid(&self) -> rustdds::GUID {
175    self.datareader.guid()
176  }
177
178  pub fn gid(&self) -> Gid {
179    self.guid().into()
180  }
181
182  /// Returns the count of currently matched Publishers.
183  ///
184  /// `my_node` must be the Node that created this Subscription, or the result
185  /// is undefined.
186  pub fn get_publisher_count(&self, my_node: &Node) -> usize {
187    my_node.get_publisher_count(self.guid())
188  }
189
190  /// Waits until there is at least one matched publisher on this topic,
191  /// possibly forever.
192  ///
193  /// `my_node` must be the Node that created this Subscription, or the length
194  /// of the wait is undefined.
195  pub fn wait_for_publisher(&self, my_node: &Node) -> impl Future<Output = ()> + Send {
196    my_node.wait_for_writer(self.guid())
197  }
198}
199
200// helper
201#[inline]
202fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo) {
203  let mi = MessageInfo::from(&dcc);
204  (dcc.into_value(), mi)
205}
206
207impl<D> Evented for Subscription<D>
208where
209  D: DeserializeOwned,
210{
211  // We just delegate all the operations to datareader, since it
212  // already implements Evented
213  fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
214    self.datareader.register(poll, token, interest, opts)
215  }
216
217  fn reregister(
218    &self,
219    poll: &Poll,
220    token: Token,
221    interest: Ready,
222    opts: PollOpt,
223  ) -> io::Result<()> {
224    self.datareader.reregister(poll, token, interest, opts)
225  }
226
227  fn deregister(&self, poll: &Poll) -> io::Result<()> {
228    self.datareader.deregister(poll)
229  }
230}