1use std::{io, sync::atomic};
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4#[allow(unused_imports)]
5use log::{debug, error, info, warn};
6use futures::{join, pin_mut, StreamExt};
7use rustdds::{
8 dds::{CreateResult, ReadError, ReadResult, WriteError, WriteResult},
9 rpc::*,
10 *,
11};
12
13use crate::{message_info::MessageInfo, node::Node, service::*};
14
15pub struct Client<S>
17where
18 S: Service,
19 S::Request: Message,
20 S::Response: Message,
21{
22 service_mapping: ServiceMapping,
23 request_sender: DataWriterR<RequestWrapper<S::Request>>,
24 response_receiver: SimpleDataReaderR<ResponseWrapper<S::Response>>,
25 sequence_number_gen: atomic::AtomicI64, client_guid: GUID, }
28
29impl<S> Client<S>
30where
31 S: 'static + Service,
32{
33 pub(crate) fn new(
34 service_mapping: ServiceMapping,
35 node: &mut Node,
36 request_topic: &Topic,
37 response_topic: &Topic,
38 qos_request: Option<QosPolicies>,
39 qos_response: Option<QosPolicies>,
40 ) -> CreateResult<Self> {
41 let request_sender =
42 node.create_datawriter
43 ::<RequestWrapper<S::Request>, ServiceSerializerAdapter<RequestWrapper<S::Request>>>(
44 request_topic, qos_request)?;
45 let response_receiver =
46 node.create_simpledatareader
47 ::<ResponseWrapper<S::Response>, ServiceDeserializerAdapter<ResponseWrapper<S::Response>>>(
48 response_topic, qos_response)?;
49
50 debug!(
51 "Created new Client: request={} response={}",
52 request_topic.name(),
53 response_topic.name()
54 );
55 let client_guid = request_sender.guid();
56 Ok(Client::<S> {
57 service_mapping,
58 request_sender,
59 response_receiver,
60 sequence_number_gen: atomic::AtomicI64::new(SequenceNumber::default().into()),
61 client_guid,
62 })
63 }
64
65 pub fn send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
68 self.increment_sequence_number();
69 let gen_rmw_req_id = RmwRequestId {
70 writer_guid: self.client_guid,
71 sequence_number: self.sequence_number(),
72 };
73 let req_wrapper = RequestWrapper::<S::Request>::new(
74 self.service_mapping,
75 gen_rmw_req_id,
76 RepresentationIdentifier::CDR_LE,
77 request,
78 )?;
79 let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
82 write_opts_builder
83 } else {
84 write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
85 };
86 let sent_rmw_req_id = self
87 .request_sender
88 .write_with_options(req_wrapper, write_opts_builder.build())
89 .map(RmwRequestId::from)
90 .map_err(|e| e.forget_data())?;
91
92 match self.service_mapping {
93 ServiceMapping::Enhanced => Ok(sent_rmw_req_id),
94 ServiceMapping::Basic | ServiceMapping::Cyclone => Ok(gen_rmw_req_id),
95 }
96 }
97
98 pub fn receive_response(&self) -> ReadResult<Option<(RmwRequestId, S::Response)>> {
105 self.response_receiver.drain_read_notifications();
106 let dcc_rw: Option<no_key::DeserializedCacheChange<ResponseWrapper<S::Response>>> =
107 self.response_receiver.try_take_one()?;
108
109 match dcc_rw {
110 None => Ok(None),
111 Some(dcc) => {
112 let mi = MessageInfo::from(&dcc);
113 let res_wrapper = dcc.into_value();
114 let (ri, res) = res_wrapper.unwrap(self.service_mapping, mi, self.client_guid)?;
115 Ok(Some((ri, res)))
116 }
117 } }
119
120 pub async fn async_send_request(&self, request: S::Request) -> WriteResult<RmwRequestId, ()> {
123 let gen_rmw_req_id =
124 async {
127 self.increment_sequence_number();
128 RmwRequestId {
129 writer_guid: self.client_guid,
130 sequence_number: self.sequence_number(),
131 }
132 }.await;
133
134 let req_wrapper = RequestWrapper::<S::Request>::new(
135 self.service_mapping,
136 gen_rmw_req_id,
137 RepresentationIdentifier::CDR_LE,
138 request,
139 )?;
140 let write_opts_builder = WriteOptionsBuilder::new().source_timestamp(Timestamp::now()); let write_opts_builder = if self.service_mapping == ServiceMapping::Enhanced {
143 write_opts_builder
144 } else {
145 write_opts_builder.related_sample_identity(SampleIdentity::from(gen_rmw_req_id))
146 };
147 let sent_rmw_req_id = self
148 .request_sender
149 .async_write_with_options(req_wrapper, write_opts_builder.build())
150 .await
151 .map(RmwRequestId::from)
152 .map_err(|e| e.forget_data())?;
153
154 let req_id = match self.service_mapping {
155 ServiceMapping::Enhanced => sent_rmw_req_id,
156 ServiceMapping::Basic | ServiceMapping::Cyclone => gen_rmw_req_id,
157 };
158 debug!(
159 "Sent Request {:?} to {:?}",
160 req_id,
161 self.request_sender.topic().name()
162 );
163 Ok(req_id)
164 }
165
166 pub async fn async_receive_response(&self, request_id: RmwRequestId) -> ReadResult<S::Response> {
170 let dcc_stream = self.response_receiver.as_async_stream();
171 pin_mut!(dcc_stream);
172
173 loop {
174 match dcc_stream.next().await {
175 Some(Err(e)) => return Err(e),
176 Some(Ok(dcc)) => {
177 let mi = MessageInfo::from(&dcc);
178 let (req_id, response) =
179 dcc
180 .into_value()
181 .unwrap(self.service_mapping, mi, self.client_guid)?;
182 if req_id == request_id {
183 return Ok(response);
184 } else {
185 debug!(
186 "Received response for someone else. expected={:?} received={:?}",
187 request_id, req_id
188 );
189 continue; }
191 }
192 None => return read_error_internal!("SimpleDataReader value stream unexpectedly ended!"),
194 }
195 } }
197
198 pub async fn async_call_service(
199 &self,
200 request: S::Request,
201 ) -> Result<S::Response, CallServiceError<()>> {
202 let req_id = self.async_send_request(request).await?;
203 self
204 .async_receive_response(req_id)
205 .await
206 .map_err(CallServiceError::from)
207 }
208
209 pub async fn wait_for_service(&self, my_node: &Node) {
217 join!(
218 my_node.wait_for_reader(self.request_sender.guid()),
219 my_node.wait_for_writer(self.response_receiver.guid())
220 );
221 }
222
223 fn increment_sequence_number(&self) {
224 self
225 .sequence_number_gen
226 .fetch_add(1, atomic::Ordering::Acquire);
227 }
228
229 fn sequence_number(&self) -> request_id::SequenceNumber {
230 self
231 .sequence_number_gen
232 .load(atomic::Ordering::Acquire)
233 .into()
234 }
235}
236
237#[derive(Debug)]
238pub enum CallServiceError<T> {
239 WriteError(WriteError<T>),
240 ReadError(ReadError),
241}
242impl<T> From<WriteError<T>> for CallServiceError<T> {
243 fn from(value: WriteError<T>) -> Self {
244 CallServiceError::WriteError(value)
245 }
246}
247impl<T> From<ReadError> for CallServiceError<T> {
248 fn from(value: ReadError) -> Self {
249 CallServiceError::ReadError(value)
250 }
251}
252
253impl<S> Evented for Client<S>
254where
255 S: 'static + Service,
256{
257 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
258 self.response_receiver.register(poll, token, interest, opts)
259 }
260
261 fn reregister(
262 &self,
263 poll: &Poll,
264 token: Token,
265 interest: Ready,
266 opts: PollOpt,
267 ) -> io::Result<()> {
268 self
269 .response_receiver
270 .reregister(poll, token, interest, opts)
271 }
272
273 fn deregister(&self, poll: &Poll) -> io::Result<()> {
274 self.response_receiver.deregister(poll)
275 }
276}