ros2_client/service/
client.rs

1use std::{io, sync::atomic};
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4#[allow(unused_imports)]
5use log::{debug, error, info, warn};
6use futures::{join, pin_mut, StreamExt};
7use rustdds::{
8  dds::{CreateResult, ReadError, ReadResult, WriteError, WriteResult},
9  rpc::*,
10  *,
11};
12
13use crate::{message_info::MessageInfo, node::Node, service::*};
14
15/// Client end of a ROS2 Service
16pub struct Client<S>
17where
18  S: Service,
19  S::Request: Message,
20  S::Response: Message,
21{
22  service_mapping: ServiceMapping,
23  request_sender: DataWriterR<RequestWrapper<S::Request>>,
24  response_receiver: SimpleDataReaderR<ResponseWrapper<S::Response>>,
25  sequence_number_gen: atomic::AtomicI64, // used by basic and cyclone
26  client_guid: GUID,                      // used by the Cyclone ServiceMapping
27}
28
29impl<S> Client<S>
30where
31  S: 'static + Service,
32{
33  pub(crate) fn new(
34    service_mapping: ServiceMapping,
35    node: &mut Node,
36    request_topic: &Topic,
37    response_topic: &Topic,
38    qos_request: Option<QosPolicies>,
39    qos_response: Option<QosPolicies>,
40  ) -> CreateResult<Self> {
41    let request_sender =
42      node.create_datawriter
43      ::<RequestWrapper<S::Request>, ServiceSerializerAdapter<RequestWrapper<S::Request>>>(
44        request_topic, qos_request)?;
45    let response_receiver =
46      node.create_simpledatareader
47      ::<ResponseWrapper<S::Response>, ServiceDeserializerAdapter<ResponseWrapper<S::Response>>>(
48        response_topic, qos_response)?;
49
50    debug!(
51      "Created new Client: request={} response={}",
52      request_topic.name(),
53      response_topic.name()
54    );
55    let client_guid = request_sender.guid();
56    Ok(Client::<S> {
57      service_mapping,
58      request_sender,
59      response_receiver,
60      sequence_number_gen: atomic::AtomicI64::new(SequenceNumber::default().into()),
61      client_guid,
62    })
63  }
64
65  /// Send a request to Service Server.
66  /// The returned `RmwRequestId` is a token to identify the correct response.
67  pub fn send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
68    self.increment_sequence_number();
69    let gen_rmw_req_id = RmwRequestId {
70      writer_guid: self.client_guid,
71      sequence_number: self.sequence_number(),
72    };
73    let req_wrapper = RequestWrapper::<S::Request>::new(
74      self.service_mapping,
75      gen_rmw_req_id,
76      RepresentationIdentifier::CDR_LE,
77      request,
78    )?;
79    let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); // always add source timestamp
80
81    let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
82      write_opts_builder
83    } else {
84      write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
85    };
86    let sent_rmw_req_id = self
87      .request_sender
88      .write_with_options(req_wrapper, write_opts_builder.build())
89      .map(RmwRequestId::from)
90      .map_err(|e| e.forget_data())?;
91
92    match self.service_mapping {
93      ServiceMapping::Enhanced => Ok(sent_rmw_req_id),
94      ServiceMapping::Basic | ServiceMapping::Cyclone => Ok(gen_rmw_req_id),
95    }
96  }
97
98  /// Receive a response from Server
99  /// Returns `Ok(None)` if no new responses have arrived.
100  /// Note: The response may to someone else's request. Check received
101  /// `RmWRequestId` against the one you got when sending request to identify
102  /// the correct response. In case you receive someone else's response,
103  /// please do receive again.
104  pub fn receive_response(&self) -> ReadResult<Option<(RmwRequestId, S::Response)>> {
105    self.response_receiver.drain_read_notifications();
106    let dcc_rw: Option<no_key::DeserializedCacheChange<ResponseWrapper<S::Response>>> =
107      self.response_receiver.try_take_one()?;
108
109    match dcc_rw {
110      None => Ok(None),
111      Some(dcc) => {
112        let mi = MessageInfo::from(&dcc);
113        let res_wrapper = dcc.into_value();
114        let (ri, res) = res_wrapper.unwrap(self.service_mapping, mi, self.client_guid)?;
115        Ok(Some((ri, res)))
116      }
117    } // match
118  }
119
120  /// Send a request to Service Server asynchronously.
121  /// The returned `RmwRequestId` is a token to identify the correct response.
122  pub async fn async_send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
123    let gen_rmw_req_id =
124      // we do the req_id generation in an async block so that we do not generate
125      // multiple sequence numbers if there are multiple polls to this function
126      async {
127        self.increment_sequence_number();
128         RmwRequestId {
129          writer_guid: self.client_guid,
130          sequence_number: self.sequence_number(),
131        }
132      }.await;
133
134    let req_wrapper = RequestWrapper::<S::Request>::new(
135      self.service_mapping,
136      gen_rmw_req_id,
137      RepresentationIdentifier::CDR_LE,
138      request,
139    )?;
140    let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); // always add source timestamp
141
142    let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
143      write_opts_builder
144    } else {
145      write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
146    };
147    let sent_rmw_req_id = self
148      .request_sender
149      .async_write_with_options(req_wrapper, write_opts_builder.build())
150      .await
151      .map(RmwRequestId::from)
152      .map_err(|e| e.forget_data())?;
153
154    let req_id = match self.service_mapping {
155      ServiceMapping::Enhanced => sent_rmw_req_id,
156      ServiceMapping::Basic | ServiceMapping::Cyclone => gen_rmw_req_id,
157    };
158    debug!(
159      "Sent Request {:?} to {:?}",
160      req_id,
161      self.request_sender.topic().name()
162    );
163    Ok(req_id)
164  }
165
166  /// Receive a response from Server
167  /// The returned Future does not complete until the response has been
168  /// received.
169  pub async fn async_receive_response(&self, request_id: RmwRequestId) -> ReadResult<S::Response> {
170    let dcc_stream = self.response_receiver.as_async_stream();
171    pin_mut!(dcc_stream);
172
173    loop {
174      match dcc_stream.next().await {
175        Some(Err(e)) => return Err(e),
176        Some(Ok(dcc)) => {
177          let mi = MessageInfo::from(&dcc);
178          let (req_id, response) =
179            dcc
180              .into_value()
181              .unwrap(self.service_mapping, mi, self.client_guid)?;
182          if req_id == request_id {
183            return Ok(response);
184          } else {
185            debug!(
186              "Received response for someone else. expected={:?}  received={:?}",
187              request_id, req_id
188            );
189            continue; //
190          }
191        }
192        // This should never occur, because topic do not "end".
193        None => return read_error_internal!("SimpleDataReader value stream unexpectedly ended!"),
194      }
195    } // loop
196  }
197
198  pub async fn async_call_service(
199    &self,
200    request: S::Request,
201  ) -> Result<S::Response, CallServiceError<()>> {
202    let req_id = self.async_send_request(request).await?;
203    self
204      .async_receive_response(req_id)
205      .await
206      .map_err(CallServiceError::from)
207  }
208
209  /// Wait for a Server to be connected to the Request and Response topics.
210  ///
211  /// This does not distinguish between diagnostinc tools and actual servers.
212  /// It is enough that someone has subscribed the Requests, and someone is
213  /// a publisher for Responses.
214  ///
215  /// May panic, if the Node does not havea background Spinner running.
216  pub async fn wait_for_service(&self, my_node: &Node) {
217    join!(
218      my_node.wait_for_reader(self.request_sender.guid()),
219      my_node.wait_for_writer(self.response_receiver.guid())
220    );
221  }
222
223  fn increment_sequence_number(&self) {
224    self
225      .sequence_number_gen
226      .fetch_add(1, atomic::Ordering::Acquire);
227  }
228
229  fn sequence_number(&self) -> request_id::SequenceNumber {
230    self
231      .sequence_number_gen
232      .load(atomic::Ordering::Acquire)
233      .into()
234  }
235}
236
237#[derive(Debug)]
238pub enum CallServiceError<T> {
239  WriteError(WriteError<T>),
240  ReadError(ReadError),
241}
242impl<T> From<WriteError<T>> for CallServiceError<T> {
243  fn from(value: WriteError<T>) -> Self {
244    CallServiceError::WriteError(value)
245  }
246}
247impl<T> From<ReadError> for CallServiceError<T> {
248  fn from(value: ReadError) -> Self {
249    CallServiceError::ReadError(value)
250  }
251}
252
253impl<S> Evented for Client<S>
254where
255  S: 'static + Service,
256{
257  fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
258    self.response_receiver.register(poll, token, interest, opts)
259  }
260
261  fn reregister(
262    &self,
263    poll: &Poll,
264    token: Token,
265    interest: Ready,
266    opts: PollOpt,
267  ) -> io::Result<()> {
268    self
269      .response_receiver
270      .reregister(poll, token, interest, opts)
271  }
272
273  fn deregister(&self, poll: &Poll) -> io::Result<()> {
274    self.response_receiver.deregister(poll)
275  }
276}