ros2_client/service/
server.rs

1use std::io;
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4#[allow(unused_imports)]
5use log::{debug, error, info, warn};
6use futures::{pin_mut, stream::FusedStream, StreamExt};
7use rustdds::{
8  dds::{CreateResult, ReadError, ReadResult, WriteResult},
9  rpc::*,
10  *,
11};
12
13use crate::{message_info::MessageInfo, node::Node, service::*};
14
15// --------------------------------------------
16// --------------------------------------------
17/// Server end of a ROS2 Service
18pub struct Server<S>
19where
20  S: Service,
21  S::Request: Message,
22  S::Response: Message,
23{
24  service_mapping: ServiceMapping,
25  request_receiver: SimpleDataReaderR<RequestWrapper<S::Request>>,
26  response_sender: DataWriterR<ResponseWrapper<S::Response>>,
27}
28
29impl<S> Server<S>
30where
31  S: 'static + Service,
32{
33  pub(crate) fn new(
34    service_mapping: ServiceMapping,
35    node: &mut Node,
36    request_topic: &Topic,
37    response_topic: &Topic,
38    qos_request: Option<QosPolicies>,
39    qos_response: Option<QosPolicies>,
40  ) -> CreateResult<Self> {
41    let request_receiver =
42      node.create_simpledatareader
43      ::<RequestWrapper<S::Request>, ServiceDeserializerAdapter<RequestWrapper<S::Request>>>(
44        request_topic, qos_request)?;
45    let response_sender =
46      node.create_datawriter
47      ::<ResponseWrapper<S::Response>, ServiceSerializerAdapter<ResponseWrapper<S::Response>>>(
48        response_topic, qos_response)?;
49
50    debug!(
51      "Created new Server: requests={} response={}",
52      request_topic.name(),
53      response_topic.name()
54    );
55
56    Ok(Server::<S> {
57      service_mapping,
58      request_receiver,
59      response_sender,
60    })
61  }
62
63  /// Receive a request from Client.
64  /// Returns `Ok(None)` if no new requests have arrived.
65  pub fn receive_request(&self) -> ReadResult<Option<(RmwRequestId, S::Request)>> {
66    self.request_receiver.drain_read_notifications();
67    let dcc_rw: Option<no_key::DeserializedCacheChange<RequestWrapper<S::Request>>> =
68      self.request_receiver.try_take_one()?;
69
70    match dcc_rw {
71      None => Ok(None),
72      Some(dcc) => {
73        let mi = MessageInfo::from(&dcc);
74        let req_wrapper = dcc.into_value();
75        let (ri, req) = req_wrapper.unwrap(self.service_mapping, &mi)?;
76        Ok(Some((ri, req)))
77      }
78    } // match
79  }
80
81  /// Send response to request by Client.
82  /// rmw_req_id identifies request being responded.
83  pub fn send_response(
84    &self,
85    rmw_req_id: RmwRequestId,
86    response: S::Response,
87  ) -> WriteResult<(), ()> {
88    let resp_wrapper = ResponseWrapper::<S::Response>::new(
89      self.service_mapping,
90      rmw_req_id,
91      RepresentationIdentifier::CDR_LE,
92      response,
93    )?;
94    let write_opts = WriteOptionsBuilder::new()
95      .source_timestamp(Timestamp::now()) // always add source timestamp
96      .related_sample_identity(SampleIdentity::from(rmw_req_id))
97      // TODO: Check if this is right. Cyclone mapping does not send
98      // Related Sample Identity in
99      // WriteOptions (QoS ParameterList), but within data payload.
100      // But maybe it is not harmful to send it in both?
101      .build();
102    self
103      .response_sender
104      .write_with_options(resp_wrapper, write_opts)
105      .map(|_| ())
106      .map_err(|e| e.forget_data()) // lose SampleIdentity result
107  }
108
109  /// The request_id must be sent back with the response to identify which
110  /// request and response belong together.
111  pub async fn async_receive_request(&self) -> ReadResult<(RmwRequestId, S::Request)> {
112    let dcc_stream = self.request_receiver.as_async_stream();
113    pin_mut!(dcc_stream);
114
115    match dcc_stream.next().await {
116      Some(Err(e)) => Err(e),
117      Some(Ok(dcc)) => {
118        let mi = MessageInfo::from(&dcc);
119        let req_wrapper = dcc.into_value();
120        let (ri, req) = req_wrapper.unwrap(self.service_mapping, &mi)?;
121        debug!("async_receive_request: {ri:?}");
122        Ok((ri, req))
123      }
124      // This should never occur, because topic do not "end".
125      None => read_error_internal!("SimpleDataReader value stream unexpectedly ended!"),
126    } // match
127  }
128
129  /// Returns a never-ending stream of (request_id, request)
130  /// The request_id must be sent back with the response to identify which
131  /// request and response belong together.
132  pub fn receive_request_stream(
133    &self,
134  ) -> impl FusedStream<Item = ReadResult<(RmwRequestId, S::Request)>> + '_ {
135    Box::pin(self.request_receiver.as_async_stream().then(
136      move |dcc_r| async move {
137        match dcc_r {
138          Err(e) => Err(e),
139          Ok(dcc) => {
140            let mi = MessageInfo::from(&dcc);
141            let req_wrapper = dcc.into_value();
142            debug!("receive_request_stream: messageinfo={mi:?}");
143            req_wrapper.unwrap(self.service_mapping, &mi)
144          }
145        } // match
146      }, // async
147    ))
148  }
149
150  /// Asynchronous response sending
151  pub async fn async_send_response(
152    &self,
153    rmw_req_id: RmwRequestId,
154    response: S::Response,
155  ) -> dds::WriteResult<(), ()> {
156    let resp_wrapper = ResponseWrapper::<S::Response>::new(
157      self.service_mapping,
158      rmw_req_id,
159      RepresentationIdentifier::CDR_LE,
160      response,
161    )?;
162    debug!("async_send_response: rmw_req_id = {rmw_req_id:?}");
163    debug!(
164      "async_send_response: related_sample_identity = {:?}",
165      SampleIdentity::from(rmw_req_id)
166    );
167    let write_opts = WriteOptionsBuilder::new()
168      .source_timestamp(Timestamp::now()) // always add source timestamp
169      .related_sample_identity(SampleIdentity::from(rmw_req_id))
170      // TODO: Check if this is right. Cyclone mapping does not send
171      // Related Sample Identity in
172      // WriteOptions (QoS ParameterList), but within data payload.
173      // But maybe it is not harmful to send it in both?
174      .build();
175    self
176      .response_sender
177      .async_write_with_options(resp_wrapper, write_opts)
178      .await
179      .map(|_| ())
180      .map_err(|e| e.forget_data()) // lose SampleIdentity result
181  }
182}
183
184impl<S> Evented for Server<S>
185where
186  S: 'static + Service,
187{
188  fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
189    self.request_receiver.register(poll, token, interest, opts)
190  }
191
192  fn reregister(
193    &self,
194    poll: &Poll,
195    token: Token,
196    interest: Ready,
197    opts: PollOpt,
198  ) -> io::Result<()> {
199    self
200      .request_receiver
201      .reregister(poll, token, interest, opts)
202  }
203
204  fn deregister(&self, poll: &Poll) -> io::Result<()> {
205    self.request_receiver.deregister(poll)
206  }
207}