ros2_client/service/
client.rs

1use std::{io, sync::atomic};
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4#[allow(unused_imports)]
5use log::{debug, error, info, warn};
6use futures::{join, pin_mut, StreamExt};
7use rustdds::{
8  dds::{CreateResult, ReadError, ReadResult, WriteError, WriteResult},
9  rpc::*,
10  *,
11};
12
13use crate::{message_info::MessageInfo, node::Node, service::*};
14
15/// Client end of a ROS2 Service
16pub struct Client<S>
17where
18  S: Service,
19  S::Request: Message,
20  S::Response: Message,
21{
22  service_mapping: ServiceMapping,
23  request_sender: DataWriterR<RequestWrapper<S::Request>>,
24  response_receiver: SimpleDataReaderR<ResponseWrapper<S::Response>>,
25  sequence_number_gen: atomic::AtomicI64, // used by basic and cyclone
26  client_guid: GUID,                      // used by the Cyclone ServiceMapping
27}
28
29impl<S> Client<S>
30where
31  S: 'static + Service,
32{
33  pub(crate) fn new(
34    service_mapping: ServiceMapping,
35    node: &mut Node,
36    request_topic: &Topic,
37    response_topic: &Topic,
38    qos_request: Option<QosPolicies>,
39    qos_response: Option<QosPolicies>,
40  ) -> CreateResult<Self> {
41    let request_sender =
42      node.create_datawriter
43      ::<RequestWrapper<S::Request>, ServiceSerializerAdapter<RequestWrapper<S::Request>>>(
44        request_topic, qos_request)?;
45    let response_receiver =
46      node.create_simpledatareader
47      ::<ResponseWrapper<S::Response>, ServiceDeserializerAdapter<ResponseWrapper<S::Response>>>(
48        response_topic, qos_response)?;
49
50    debug!(
51      "Created new Client: request={} response={}",
52      request_topic.name(),
53      response_topic.name()
54    );
55    let client_guid = request_sender.guid();
56    Ok(Client::<S> {
57      service_mapping,
58      request_sender,
59      response_receiver,
60      sequence_number_gen: atomic::AtomicI64::new(SequenceNumber::default().into()),
61      client_guid,
62    })
63  }
64
65  /// Send a request to Service Server.
66  /// The returned `RmwRequestId` is a token to identify the correct response.
67  pub fn send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
68    self.increment_sequence_number();
69    let gen_rmw_req_id = RmwRequestId {
70      writer_guid: self.client_guid,
71      sequence_number: self.sequence_number(),
72    };
73    let req_wrapper = RequestWrapper::<S::Request>::new(
74      self.service_mapping,
75      gen_rmw_req_id,
76      RepresentationIdentifier::CDR_LE,
77      request,
78    )?;
79    let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); // always add source timestamp
80
81    let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
82      write_opts_builder
83    } else {
84      write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
85    };
86    let sent_rmw_req_id = self
87      .request_sender
88      .write_with_options(req_wrapper, write_opts_builder.build())
89      .map(RmwRequestId::from)
90      .map_err(|e| e.forget_data())?;
91
92    match self.service_mapping {
93      ServiceMapping::Enhanced => Ok(sent_rmw_req_id),
94      ServiceMapping::Basic | ServiceMapping::Cyclone => Ok(gen_rmw_req_id),
95    }
96  }
97
98  /// Receive a response from Server
99  /// Returns `Ok(None)` if no new responses have arrived.
100  /// Note: The response may to someone else's request. Check received
101  /// `RmWRequestId` against the one you got when sending request to identify
102  /// the correct response. In case you receive someone else's response,
103  /// please do receive again.
104  pub fn receive_response(&self) -> ReadResult<Option<(RmwRequestId, S::Response)>> {
105    self.response_receiver.drain_read_notifications();
106    let dcc_rw: Option<no_key::DeserializedCacheChange<ResponseWrapper<S::Response>>> =
107      self.response_receiver.try_take_one()?;
108
109    match dcc_rw {
110      None => Ok(None),
111      Some(dcc) => {
112        let mi = MessageInfo::from(&dcc);
113        let res_wrapper = dcc.into_value();
114        let (ri, res) = res_wrapper.unwrap(self.service_mapping, mi, self.client_guid)?;
115        Ok(Some((ri, res)))
116      }
117    } // match
118  }
119
120  /// Send a request to Service Server asynchronously.
121  /// The returned `RmwRequestId` is a token to identify the correct response.
122  pub async fn async_send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
123    let gen_rmw_req_id =
124      // we do the req_id generation in an async block so that we do not generate
125      // multiple sequence numbers if there are multiple polls to this function
126      async {
127        self.increment_sequence_number();
128         RmwRequestId {
129          writer_guid: self.client_guid,
130          sequence_number: self.sequence_number(),
131        }
132      }.await;
133
134    let req_wrapper = RequestWrapper::<S::Request>::new(
135      self.service_mapping,
136      gen_rmw_req_id,
137      RepresentationIdentifier::CDR_LE,
138      request,
139    )?;
140    let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); // always add source timestamp
141
142    let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
143      write_opts_builder
144    } else {
145      write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
146    };
147    let sent_rmw_req_id = self
148      .request_sender
149      .async_write_with_options(req_wrapper, write_opts_builder.build())
150      .await
151      .map(RmwRequestId::from)
152      .map_err(|e| e.forget_data())?;
153
154    let req_id = match self.service_mapping {
155      ServiceMapping::Enhanced => sent_rmw_req_id,
156      ServiceMapping::Basic | ServiceMapping::Cyclone => gen_rmw_req_id,
157    };
158    debug!(
159      "Sent Request {:?} to {:?}",
160      req_id,
161      self.request_sender.topic().name()
162    );
163    Ok(req_id)
164  }
165
166  /// Receive a response from Server
167  /// The returned Future does not complete until the response has been
168  /// received.
169  pub async fn async_receive_response(&self, request_id: RmwRequestId) -> ReadResult<S::Response> {
170    let dcc_stream = self.response_receiver.as_async_stream();
171    pin_mut!(dcc_stream);
172
173    loop {
174      match dcc_stream.next().await {
175        Some(Err(e)) => return Err(e),
176        Some(Ok(dcc)) => {
177          let mi = MessageInfo::from(&dcc);
178          let (req_id, response) =
179            dcc
180              .into_value()
181              .unwrap(self.service_mapping, mi, self.client_guid)?;
182          if req_id == request_id {
183            return Ok(response);
184          } else {
185            debug!(
186              "Received response for someone else. expected={request_id:?}  received={req_id:?}"
187            );
188            continue; //
189          }
190        }
191        // This should never occur, because topic do not "end".
192        None => return read_error_internal!("SimpleDataReader value stream unexpectedly ended!"),
193      }
194    } // loop
195  }
196
197  pub async fn async_call_service(
198    &self,
199    request: S::Request,
200  ) -> Result<S::Response, CallServiceError<()>> {
201    let req_id = self.async_send_request(request).await?;
202    self
203      .async_receive_response(req_id)
204      .await
205      .map_err(CallServiceError::from)
206  }
207
208  /// Wait for a Server to be connected to the Request and Response topics.
209  ///
210  /// This does not distinguish between diagnostinc tools and actual servers.
211  /// It is enough that someone has subscribed the Requests, and someone is
212  /// a publisher for Responses.
213  ///
214  /// May panic, if the Node does not havea background Spinner running.
215  pub async fn wait_for_service(&self, my_node: &Node) {
216    join!(
217      my_node.wait_for_reader(self.request_sender.guid()),
218      my_node.wait_for_writer(self.response_receiver.guid())
219    );
220  }
221
222  fn increment_sequence_number(&self) {
223    self
224      .sequence_number_gen
225      .fetch_add(1, atomic::Ordering::Acquire);
226  }
227
228  fn sequence_number(&self) -> request_id::SequenceNumber {
229    self
230      .sequence_number_gen
231      .load(atomic::Ordering::Acquire)
232      .into()
233  }
234}
235
236#[derive(Debug)]
237pub enum CallServiceError<T> {
238  WriteError(WriteError<T>),
239  ReadError(ReadError),
240}
241impl<T> From<WriteError<T>> for CallServiceError<T> {
242  fn from(value: WriteError<T>) -> Self {
243    CallServiceError::WriteError(value)
244  }
245}
246impl<T> From<ReadError> for CallServiceError<T> {
247  fn from(value: ReadError) -> Self {
248    CallServiceError::ReadError(value)
249  }
250}
251
252impl<S> Evented for Client<S>
253where
254  S: 'static + Service,
255{
256  fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
257    self.response_receiver.register(poll, token, interest, opts)
258  }
259
260  fn reregister(
261    &self,
262    poll: &Poll,
263    token: Token,
264    interest: Ready,
265    opts: PollOpt,
266  ) -> io::Result<()> {
267    self
268      .response_receiver
269      .reregister(poll, token, interest, opts)
270  }
271
272  fn deregister(&self, poll: &Poll) -> io::Result<()> {
273    self.response_receiver.deregister(poll)
274  }
275}