1use std::{io, sync::atomic};
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4#[allow(unused_imports)]
5use log::{debug, error, info, warn};
6use futures::{join, pin_mut, StreamExt};
7use rustdds::{
8 dds::{CreateResult, ReadError, ReadResult, WriteError, WriteResult},
9 rpc::*,
10 *,
11};
12
13use crate::{message_info::MessageInfo, node::Node, service::*};
14
15pub struct Client<S>
17where
18 S: Service,
19 S::Request: Message,
20 S::Response: Message,
21{
22 service_mapping: ServiceMapping,
23 request_sender: DataWriterR<RequestWrapper<S::Request>>,
24 response_receiver: SimpleDataReaderR<ResponseWrapper<S::Response>>,
25 sequence_number_gen: atomic::AtomicI64, client_guid: GUID, }
28
29impl<S> Client<S>
30where
31 S: 'static + Service,
32{
33 pub(crate) fn new(
34 service_mapping: ServiceMapping,
35 node: &mut Node,
36 request_topic: &Topic,
37 response_topic: &Topic,
38 qos_request: Option<QosPolicies>,
39 qos_response: Option<QosPolicies>,
40 ) -> CreateResult<Self> {
41 let request_sender =
42 node.create_datawriter
43 ::<RequestWrapper<S::Request>, ServiceSerializerAdapter<RequestWrapper<S::Request>>>(
44 request_topic, qos_request)?;
45 let response_receiver =
46 node.create_simpledatareader
47 ::<ResponseWrapper<S::Response>, ServiceDeserializerAdapter<ResponseWrapper<S::Response>>>(
48 response_topic, qos_response)?;
49
50 debug!(
51 "Created new Client: request={} response={}",
52 request_topic.name(),
53 response_topic.name()
54 );
55 let client_guid = request_sender.guid();
56 Ok(Client::<S> {
57 service_mapping,
58 request_sender,
59 response_receiver,
60 sequence_number_gen: atomic::AtomicI64::new(SequenceNumber::default().into()),
61 client_guid,
62 })
63 }
64
65 pub fn send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
68 self.increment_sequence_number();
69 let gen_rmw_req_id = RmwRequestId {
70 writer_guid: self.client_guid,
71 sequence_number: self.sequence_number(),
72 };
73 let req_wrapper = RequestWrapper::<S::Request>::new(
74 self.service_mapping,
75 gen_rmw_req_id,
76 RepresentationIdentifier::CDR_LE,
77 request,
78 )?;
79 let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
82 write_opts_builder
83 } else {
84 write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
85 };
86 let sent_rmw_req_id = self
87 .request_sender
88 .write_with_options(req_wrapper, write_opts_builder.build())
89 .map(RmwRequestId::from)
90 .map_err(|e| e.forget_data())?;
91
92 match self.service_mapping {
93 ServiceMapping::Enhanced => Ok(sent_rmw_req_id),
94 ServiceMapping::Basic | ServiceMapping::Cyclone => Ok(gen_rmw_req_id),
95 }
96 }
97
98 pub fn receive_response(&self) -> ReadResult<Option<(RmwRequestId, S::Response)>> {
105 self.response_receiver.drain_read_notifications();
106 let dcc_rw: Option<no_key::DeserializedCacheChange<ResponseWrapper<S::Response>>> =
107 self.response_receiver.try_take_one()?;
108
109 match dcc_rw {
110 None => Ok(None),
111 Some(dcc) => {
112 let mi = MessageInfo::from(&dcc);
113 let res_wrapper = dcc.into_value();
114 let (ri, res) = res_wrapper.unwrap(self.service_mapping, mi, self.client_guid)?;
115 Ok(Some((ri, res)))
116 }
117 } }
119
120 pub async fn async_send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
123 let gen_rmw_req_id =
124 async {
127 self.increment_sequence_number();
128 RmwRequestId {
129 writer_guid: self.client_guid,
130 sequence_number: self.sequence_number(),
131 }
132 }.await;
133
134 let req_wrapper = RequestWrapper::<S::Request>::new(
135 self.service_mapping,
136 gen_rmw_req_id,
137 RepresentationIdentifier::CDR_LE,
138 request,
139 )?;
140 let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
143 write_opts_builder
144 } else {
145 write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
146 };
147 let sent_rmw_req_id = self
148 .request_sender
149 .async_write_with_options(req_wrapper, write_opts_builder.build())
150 .await
151 .map(RmwRequestId::from)
152 .map_err(|e| e.forget_data())?;
153
154 let req_id = match self.service_mapping {
155 ServiceMapping::Enhanced => sent_rmw_req_id,
156 ServiceMapping::Basic | ServiceMapping::Cyclone => gen_rmw_req_id,
157 };
158 debug!(
159 "Sent Request {:?} to {:?}",
160 req_id,
161 self.request_sender.topic().name()
162 );
163 Ok(req_id)
164 }
165
166 pub async fn async_receive_response(&self, request_id: RmwRequestId) -> ReadResult<S::Response> {
170 let dcc_stream = self.response_receiver.as_async_stream();
171 pin_mut!(dcc_stream);
172
173 loop {
174 match dcc_stream.next().await {
175 Some(Err(e)) => return Err(e),
176 Some(Ok(dcc)) => {
177 let mi = MessageInfo::from(&dcc);
178 let (req_id, response) =
179 dcc
180 .into_value()
181 .unwrap(self.service_mapping, mi, self.client_guid)?;
182 if req_id == request_id {
183 return Ok(response);
184 } else {
185 debug!(
186 "Received response for someone else. expected={request_id:?} received={req_id:?}"
187 );
188 continue; }
190 }
191 None => return read_error_internal!("SimpleDataReader value stream unexpectedly ended!"),
193 }
194 } }
196
197 pub async fn async_call_service(
198 &self,
199 request: S::Request,
200 ) -> Result<S::Response, CallServiceError<()>> {
201 let req_id = self.async_send_request(request).await?;
202 self
203 .async_receive_response(req_id)
204 .await
205 .map_err(CallServiceError::from)
206 }
207
208 pub async fn wait_for_service(&self, my_node: &Node) {
216 join!(
217 my_node.wait_for_reader(self.request_sender.guid()),
218 my_node.wait_for_writer(self.response_receiver.guid())
219 );
220 }
221
222 fn increment_sequence_number(&self) {
223 self
224 .sequence_number_gen
225 .fetch_add(1, atomic::Ordering::Acquire);
226 }
227
228 fn sequence_number(&self) -> request_id::SequenceNumber {
229 self
230 .sequence_number_gen
231 .load(atomic::Ordering::Acquire)
232 .into()
233 }
234}
235
236#[derive(Debug)]
237pub enum CallServiceError<T> {
238 WriteError(WriteError<T>),
239 ReadError(ReadError),
240}
241impl<T> From<WriteError<T>> for CallServiceError<T> {
242 fn from(value: WriteError<T>) -> Self {
243 CallServiceError::WriteError(value)
244 }
245}
246impl<T> From<ReadError> for CallServiceError<T> {
247 fn from(value: ReadError) -> Self {
248 CallServiceError::ReadError(value)
249 }
250}
251
252impl<S> Evented for Client<S>
253where
254 S: 'static + Service,
255{
256 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
257 self.response_receiver.register(poll, token, interest, opts)
258 }
259
260 fn reregister(
261 &self,
262 poll: &Poll,
263 token: Token,
264 interest: Ready,
265 opts: PollOpt,
266 ) -> io::Result<()> {
267 self
268 .response_receiver
269 .reregister(poll, token, interest, opts)
270 }
271
272 fn deregister(&self, poll: &Poll) -> io::Result<()> {
273 self.response_receiver.deregister(poll)
274 }
275}