1use std::io;
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4#[allow(unused_imports)]
5use log::{debug, error, info, warn};
6use futures::{pin_mut, stream::FusedStream, StreamExt};
7use rustdds::{
8 dds::{CreateResult, ReadError, ReadResult, WriteResult},
9 rpc::*,
10 *,
11};
12
13use crate::{message_info::MessageInfo, node::Node, service::*};
14
15pub struct Server<S>
19where
20 S: Service,
21 S::Request: Message,
22 S::Response: Message,
23{
24 service_mapping: ServiceMapping,
25 request_receiver: SimpleDataReaderR<RequestWrapper<S::Request>>,
26 response_sender: DataWriterR<ResponseWrapper<S::Response>>,
27}
28
29impl<S> Server<S>
30where
31 S: 'static + Service,
32{
33 pub(crate) fn new(
34 service_mapping: ServiceMapping,
35 node: &mut Node,
36 request_topic: &Topic,
37 response_topic: &Topic,
38 qos_request: Option<QosPolicies>,
39 qos_response: Option<QosPolicies>,
40 ) -> CreateResult<Self> {
41 let request_receiver =
42 node.create_simpledatareader
43 ::<RequestWrapper<S::Request>, ServiceDeserializerAdapter<RequestWrapper<S::Request>>>(
44 request_topic, qos_request)?;
45 let response_sender =
46 node.create_datawriter
47 ::<ResponseWrapper<S::Response>, ServiceSerializerAdapter<ResponseWrapper<S::Response>>>(
48 response_topic, qos_response)?;
49
50 debug!(
51 "Created new Server: requests={} response={}",
52 request_topic.name(),
53 response_topic.name()
54 );
55
56 Ok(Server::<S> {
57 service_mapping,
58 request_receiver,
59 response_sender,
60 })
61 }
62
63 pub fn receive_request(&self) -> ReadResult<Option<(RmwRequestId, S::Request)>> {
66 self.request_receiver.drain_read_notifications();
67 let dcc_rw: Option<no_key::DeserializedCacheChange<RequestWrapper<S::Request>>> =
68 self.request_receiver.try_take_one()?;
69
70 match dcc_rw {
71 None => Ok(None),
72 Some(dcc) => {
73 let mi = MessageInfo::from(&dcc);
74 let req_wrapper = dcc.into_value();
75 let (ri, req) = req_wrapper.unwrap(self.service_mapping, &mi)?;
76 Ok(Some((ri, req)))
77 }
78 } }
80
81 pub fn send_response(
84 &self,
85 rmw_req_id: RmwRequestId,
86 response: S::Response,
87 ) -> WriteResult<(), ()> {
88 let resp_wrapper = ResponseWrapper::<S::Response>::new(
89 self.service_mapping,
90 rmw_req_id,
91 RepresentationIdentifier::CDR_LE,
92 response,
93 )?;
94 let write_opts = WriteOptionsBuilder::new()
95 .source_timestamp(Timestamp::now()) .related_sample_identity(SampleIdentity::from(rmw_req_id))
97 .build();
102 self
103 .response_sender
104 .write_with_options(resp_wrapper, write_opts)
105 .map(|_| ())
106 .map_err(|e| e.forget_data()) }
108
109 pub async fn async_receive_request(&self) -> ReadResult<(RmwRequestId, S::Request)> {
112 let dcc_stream = self.request_receiver.as_async_stream();
113 pin_mut!(dcc_stream);
114
115 match dcc_stream.next().await {
116 Some(Err(e)) => Err(e),
117 Some(Ok(dcc)) => {
118 let mi = MessageInfo::from(&dcc);
119 let req_wrapper = dcc.into_value();
120 let (ri, req) = req_wrapper.unwrap(self.service_mapping, &mi)?;
121 debug!("async_receive_request: {ri:?}");
122 Ok((ri, req))
123 }
124 None => read_error_internal!("SimpleDataReader value stream unexpectedly ended!"),
126 } }
128
129 pub fn receive_request_stream(
133 &self,
134 ) -> impl FusedStream<Item = ReadResult<(RmwRequestId, S::Request)>> + '_ {
135 Box::pin(self.request_receiver.as_async_stream().then(
136 move |dcc_r| async move {
137 match dcc_r {
138 Err(e) => Err(e),
139 Ok(dcc) => {
140 let mi = MessageInfo::from(&dcc);
141 let req_wrapper = dcc.into_value();
142 debug!("receive_request_stream: messageinfo={mi:?}");
143 req_wrapper.unwrap(self.service_mapping, &mi)
144 }
145 } }, ))
148 }
149
150 pub async fn async_send_response(
152 &self,
153 rmw_req_id: RmwRequestId,
154 response: S::Response,
155 ) -> dds::WriteResult<(), ()> {
156 let resp_wrapper = ResponseWrapper::<S::Response>::new(
157 self.service_mapping,
158 rmw_req_id,
159 RepresentationIdentifier::CDR_LE,
160 response,
161 )?;
162 debug!("async_send_response: rmw_req_id = {rmw_req_id:?}");
163 debug!(
164 "async_send_response: related_sample_identity = {:?}",
165 SampleIdentity::from(rmw_req_id)
166 );
167 let write_opts = WriteOptionsBuilder::new()
168 .source_timestamp(Timestamp::now()) .related_sample_identity(SampleIdentity::from(rmw_req_id))
170 .build();
175 self
176 .response_sender
177 .async_write_with_options(resp_wrapper, write_opts)
178 .await
179 .map(|_| ())
180 .map_err(|e| e.forget_data()) }
182}
183
184impl<S> Evented for Server<S>
185where
186 S: 'static + Service,
187{
188 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
189 self.request_receiver.register(poll, token, interest, opts)
190 }
191
192 fn reregister(
193 &self,
194 poll: &Poll,
195 token: Token,
196 interest: Ready,
197 opts: PollOpt,
198 ) -> io::Result<()> {
199 self
200 .request_receiver
201 .reregister(poll, token, interest, opts)
202 }
203
204 fn deregister(&self, poll: &Poll) -> io::Result<()> {
205 self.request_receiver.deregister(poll)
206 }
207}