do_not_use_testing_rclrs/
client.rs1use std::boxed::Box;
2use std::collections::HashMap;
3use std::ffi::CString;
4use std::sync::atomic::AtomicBool;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::channel::oneshot;
8use rosidl_runtime_rs::Message;
9
10use crate::error::{RclReturnCode, ToResult};
11use crate::MessageCow;
12use crate::{rcl_bindings::*, RclrsError};
13
14unsafe impl Send for rcl_client_t {}
17
18pub struct ClientHandle {
20 rcl_client_mtx: Mutex<rcl_client_t>,
21 rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
22 pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
23}
24
25impl ClientHandle {
26 pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
27 self.rcl_client_mtx.lock().unwrap()
28 }
29}
30
31impl Drop for ClientHandle {
32 fn drop(&mut self) {
33 let rcl_client = self.rcl_client_mtx.get_mut().unwrap();
34 let rcl_node_mtx = &mut *self.rcl_node_mtx.lock().unwrap();
35 unsafe {
37 rcl_client_fini(rcl_client, rcl_node_mtx);
38 }
39 }
40}
41
42pub trait ClientBase: Send + Sync {
46 fn handle(&self) -> &ClientHandle;
48 fn execute(&self) -> Result<(), RclrsError>;
50}
51
52type RequestValue<Response> = Box<dyn FnOnce(Response) + 'static + Send>;
53
54type RequestId = i64;
55
56pub struct Client<T>
64where
65 T: rosidl_runtime_rs::Service,
66{
67 pub(crate) handle: Arc<ClientHandle>,
68 requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
69 futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
70}
71
72impl<T> Client<T>
73where
74 T: rosidl_runtime_rs::Service,
75{
76 pub(crate) fn new(rcl_node_mtx: Arc<Mutex<rcl_node_t>>, topic: &str) -> Result<Self, RclrsError>
78 where
81 T: rosidl_runtime_rs::Service,
82 {
83 let mut rcl_client = unsafe { rcl_get_zero_initialized_client() };
85 let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
86 as *const rosidl_service_type_support_t;
87 let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
88 err,
89 s: topic.into(),
90 })?;
91
92 let client_options = unsafe { rcl_client_get_default_options() };
94
95 unsafe {
96 rcl_client_init(
101 &mut rcl_client,
102 &*rcl_node_mtx.lock().unwrap(),
103 type_support,
104 topic_c_string.as_ptr(),
105 &client_options,
106 )
107 .ok()?;
108 }
109
110 let handle = Arc::new(ClientHandle {
111 rcl_client_mtx: Mutex::new(rcl_client),
112 rcl_node_mtx,
113 in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
114 });
115
116 Ok(Self {
117 handle,
118 requests: Mutex::new(HashMap::new()),
119 futures: Arc::new(Mutex::new(
120 HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
121 )),
122 })
123 }
124
125 pub fn async_send_request_with_callback<'a, M: MessageCow<'a, T::Request>, F>(
138 &self,
139 message: M,
140 callback: F,
141 ) -> Result<(), RclrsError>
142 where
143 F: FnOnce(T::Response) + 'static + Send,
144 {
145 let rmw_message = T::Request::into_rmw_message(message.into_cow());
146 let mut sequence_number = -1;
147 unsafe {
148 rcl_send_request(
150 &*self.handle.lock() as *const _,
151 rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
152 &mut sequence_number,
153 )
154 }
155 .ok()?;
156 let requests = &mut *self.requests.lock().unwrap();
157 requests.insert(sequence_number, Box::new(callback));
158 Ok(())
159 }
160
161 pub async fn call_async<'a, R: MessageCow<'a, T::Request>>(
174 &self,
175 request: R,
176 ) -> Result<T::Response, RclrsError>
177 where
178 T: rosidl_runtime_rs::Service,
179 {
180 let rmw_message = T::Request::into_rmw_message(request.into_cow());
181 let mut sequence_number = -1;
182 unsafe {
183 rcl_send_request(
185 &*self.handle.lock() as *const _,
186 rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
187 &mut sequence_number,
188 )
189 }
190 .ok()?;
191 let (tx, rx) = oneshot::channel::<T::Response>();
192 self.futures.lock().unwrap().insert(sequence_number, tx);
193 Ok(rx.await.unwrap())
197 }
198
199 pub fn take_response(&self) -> Result<(T::Response, rmw_request_id_t), RclrsError> {
222 let mut request_id_out = rmw_request_id_t {
223 writer_guid: [0; 16],
224 sequence_number: 0,
225 };
226 type RmwMsg<T> =
227 <<T as rosidl_runtime_rs::Service>::Response as rosidl_runtime_rs::Message>::RmwMsg;
228 let mut response_out = RmwMsg::<T>::default();
229 let handle = &*self.handle.lock();
230 unsafe {
231 rcl_take_response(
233 handle,
234 &mut request_id_out,
235 &mut response_out as *mut RmwMsg<T> as *mut _,
236 )
237 }
238 .ok()?;
239 Ok((T::Response::from_rmw_message(response_out), request_id_out))
240 }
241
242 pub fn service_is_ready(&self) -> Result<bool, RclrsError> {
247 let mut is_ready = false;
248 let client = &mut *self.handle.rcl_client_mtx.lock().unwrap();
249 let node = &mut *self.handle.rcl_node_mtx.lock().unwrap();
250
251 unsafe {
252 rcl_service_server_is_available(node as *const _, client as *const _, &mut is_ready)
255 }
256 .ok()?;
257 Ok(is_ready)
258 }
259}
260
261impl<T> ClientBase for Client<T>
262where
263 T: rosidl_runtime_rs::Service,
264{
265 fn handle(&self) -> &ClientHandle {
266 &self.handle
267 }
268
269 fn execute(&self) -> Result<(), RclrsError> {
270 let (res, req_id) = match self.take_response() {
271 Ok((res, req_id)) => (res, req_id),
272 Err(RclrsError::RclError {
273 code: RclReturnCode::ClientTakeFailed,
274 ..
275 }) => {
276 return Ok(());
279 }
280 Err(e) => return Err(e),
281 };
282 let requests = &mut *self.requests.lock().unwrap();
283 let futures = &mut *self.futures.lock().unwrap();
284 if let Some(callback) = requests.remove(&req_id.sequence_number) {
285 callback(res);
286 } else if let Some(future) = futures.remove(&req_id.sequence_number) {
287 let _ = future.send(res);
288 }
289 Ok(())
290 }
291}