do_not_use_testing_rclrs/
client.rs

1use std::boxed::Box;
2use std::collections::HashMap;
3use std::ffi::CString;
4use std::sync::atomic::AtomicBool;
5use std::sync::{Arc, Mutex, MutexGuard};
6
7use futures::channel::oneshot;
8use rosidl_runtime_rs::Message;
9
10use crate::error::{RclReturnCode, ToResult};
11use crate::MessageCow;
12use crate::{rcl_bindings::*, RclrsError};
13
14// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
15// they are running in. Therefore, this type can be safely sent to another thread.
16unsafe impl Send for rcl_client_t {}
17
18/// Internal struct used by clients.
19pub struct ClientHandle {
20    rcl_client_mtx: Mutex<rcl_client_t>,
21    rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
22    pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
23}
24
25impl ClientHandle {
26    pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
27        self.rcl_client_mtx.lock().unwrap()
28    }
29}
30
31impl Drop for ClientHandle {
32    fn drop(&mut self) {
33        let rcl_client = self.rcl_client_mtx.get_mut().unwrap();
34        let rcl_node_mtx = &mut *self.rcl_node_mtx.lock().unwrap();
35        // SAFETY: No preconditions for this function
36        unsafe {
37            rcl_client_fini(rcl_client, rcl_node_mtx);
38        }
39    }
40}
41
42/// Trait to be implemented by concrete Client structs.
43///
44/// See [`Client<T>`] for an example.
45pub trait ClientBase: Send + Sync {
46    /// Internal function to get a reference to the `rcl` handle.
47    fn handle(&self) -> &ClientHandle;
48    /// Tries to take a new response and run the callback or future with it.
49    fn execute(&self) -> Result<(), RclrsError>;
50}
51
52type RequestValue<Response> = Box<dyn FnOnce(Response) + 'static + Send>;
53
54type RequestId = i64;
55
56/// Main class responsible for sending requests to a ROS service.
57///
58/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
59/// ensure that [`Node`][2]s can track all the clients that have been created.
60///
61/// [1]: crate::Node::create_client
62/// [2]: crate::Node
63pub struct Client<T>
64where
65    T: rosidl_runtime_rs::Service,
66{
67    pub(crate) handle: Arc<ClientHandle>,
68    requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
69    futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
70}
71
72impl<T> Client<T>
73where
74    T: rosidl_runtime_rs::Service,
75{
76    /// Creates a new client.
77    pub(crate) fn new(rcl_node_mtx: Arc<Mutex<rcl_node_t>>, topic: &str) -> Result<Self, RclrsError>
78    // This uses pub(crate) visibility to avoid instantiating this struct outside
79    // [`Node::create_client`], see the struct's documentation for the rationale
80    where
81        T: rosidl_runtime_rs::Service,
82    {
83        // SAFETY: Getting a zero-initialized value is always safe.
84        let mut rcl_client = unsafe { rcl_get_zero_initialized_client() };
85        let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
86            as *const rosidl_service_type_support_t;
87        let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
88            err,
89            s: topic.into(),
90        })?;
91
92        // SAFETY: No preconditions for this function.
93        let client_options = unsafe { rcl_client_get_default_options() };
94
95        unsafe {
96            // SAFETY: The rcl_client is zero-initialized as expected by this function.
97            // The rcl_node is kept alive because it is co-owned by the client.
98            // The topic name and the options are copied by this function, so they can be dropped
99            // afterwards.
100            rcl_client_init(
101                &mut rcl_client,
102                &*rcl_node_mtx.lock().unwrap(),
103                type_support,
104                topic_c_string.as_ptr(),
105                &client_options,
106            )
107            .ok()?;
108        }
109
110        let handle = Arc::new(ClientHandle {
111            rcl_client_mtx: Mutex::new(rcl_client),
112            rcl_node_mtx,
113            in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
114        });
115
116        Ok(Self {
117            handle,
118            requests: Mutex::new(HashMap::new()),
119            futures: Arc::new(Mutex::new(
120                HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
121            )),
122        })
123    }
124
125    /// Sends a request with a callback to be called with the response.
126    ///
127    /// The [`MessageCow`] trait is implemented by any
128    /// [`Message`] as well as any reference to a `Message`.
129    ///
130    /// The reason for allowing owned messages is that publishing owned messages can be more
131    /// efficient in the case of idiomatic messages[^note].
132    ///
133    /// [^note]: See the [`Message`] trait for an explanation of "idiomatic".
134    ///
135    /// Hence, when a message will not be needed anymore after publishing, pass it by value.
136    /// When a message will be needed again after publishing, pass it by reference, instead of cloning and passing by value.
137    pub fn async_send_request_with_callback<'a, M: MessageCow<'a, T::Request>, F>(
138        &self,
139        message: M,
140        callback: F,
141    ) -> Result<(), RclrsError>
142    where
143        F: FnOnce(T::Response) + 'static + Send,
144    {
145        let rmw_message = T::Request::into_rmw_message(message.into_cow());
146        let mut sequence_number = -1;
147        unsafe {
148            // SAFETY: The request type is guaranteed to match the client type by the type system.
149            rcl_send_request(
150                &*self.handle.lock() as *const _,
151                rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
152                &mut sequence_number,
153            )
154        }
155        .ok()?;
156        let requests = &mut *self.requests.lock().unwrap();
157        requests.insert(sequence_number, Box::new(callback));
158        Ok(())
159    }
160
161    /// Sends a request and returns the response as a `Future`.
162    ///
163    /// The [`MessageCow`] trait is implemented by any
164    /// [`Message`] as well as any reference to a `Message`.
165    ///
166    /// The reason for allowing owned messages is that publishing owned messages can be more
167    /// efficient in the case of idiomatic messages[^note].
168    ///
169    /// [^note]: See the [`Message`] trait for an explanation of "idiomatic".
170    ///
171    /// Hence, when a message will not be needed anymore after publishing, pass it by value.
172    /// When a message will be needed again after publishing, pass it by reference, instead of cloning and passing by value.
173    pub async fn call_async<'a, R: MessageCow<'a, T::Request>>(
174        &self,
175        request: R,
176    ) -> Result<T::Response, RclrsError>
177    where
178        T: rosidl_runtime_rs::Service,
179    {
180        let rmw_message = T::Request::into_rmw_message(request.into_cow());
181        let mut sequence_number = -1;
182        unsafe {
183            // SAFETY: The request type is guaranteed to match the client type by the type system.
184            rcl_send_request(
185                &*self.handle.lock() as *const _,
186                rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
187                &mut sequence_number,
188            )
189        }
190        .ok()?;
191        let (tx, rx) = oneshot::channel::<T::Response>();
192        self.futures.lock().unwrap().insert(sequence_number, tx);
193        // It is safe to call unwrap() here since the `Canceled` error will only happen when the
194        // `Sender` is dropped
195        // https://docs.rs/futures/latest/futures/channel/oneshot/struct.Canceled.html
196        Ok(rx.await.unwrap())
197    }
198
199    /// Fetches a new response.
200    ///
201    /// When there is no new message, this will return a
202    /// [`ClientTakeFailed`][1].
203    ///
204    /// [1]: crate::RclrsError
205    //
206    // ```text
207    // +----------------------+
208    // | rclrs::take_response |
209    // +----------+-----------+
210    //            |
211    //            |
212    // +----------v-----------+
213    // |   rcl_take_response  |
214    // +----------+-----------+
215    //            |
216    //            |
217    // +----------v----------+
218    // |      rmw_take       |
219    // +---------------------+
220    // ```
221    pub fn take_response(&self) -> Result<(T::Response, rmw_request_id_t), RclrsError> {
222        let mut request_id_out = rmw_request_id_t {
223            writer_guid: [0; 16],
224            sequence_number: 0,
225        };
226        type RmwMsg<T> =
227            <<T as rosidl_runtime_rs::Service>::Response as rosidl_runtime_rs::Message>::RmwMsg;
228        let mut response_out = RmwMsg::<T>::default();
229        let handle = &*self.handle.lock();
230        unsafe {
231            // SAFETY: The three pointers are valid/initialized
232            rcl_take_response(
233                handle,
234                &mut request_id_out,
235                &mut response_out as *mut RmwMsg<T> as *mut _,
236            )
237        }
238        .ok()?;
239        Ok((T::Response::from_rmw_message(response_out), request_id_out))
240    }
241
242    /// Check if a service server is available.
243    ///
244    /// Will return true if there is a service server available, false if unavailable.
245    ///
246    pub fn service_is_ready(&self) -> Result<bool, RclrsError> {
247        let mut is_ready = false;
248        let client = &mut *self.handle.rcl_client_mtx.lock().unwrap();
249        let node = &mut *self.handle.rcl_node_mtx.lock().unwrap();
250
251        unsafe {
252            // SAFETY both node and client are guaranteed to be valid here
253            // client is guaranteed to have been generated with node
254            rcl_service_server_is_available(node as *const _, client as *const _, &mut is_ready)
255        }
256        .ok()?;
257        Ok(is_ready)
258    }
259}
260
261impl<T> ClientBase for Client<T>
262where
263    T: rosidl_runtime_rs::Service,
264{
265    fn handle(&self) -> &ClientHandle {
266        &self.handle
267    }
268
269    fn execute(&self) -> Result<(), RclrsError> {
270        let (res, req_id) = match self.take_response() {
271            Ok((res, req_id)) => (res, req_id),
272            Err(RclrsError::RclError {
273                code: RclReturnCode::ClientTakeFailed,
274                ..
275            }) => {
276                // Spurious wakeup – this may happen even when a waitset indicated that this
277                // client was ready, so it shouldn't be an error.
278                return Ok(());
279            }
280            Err(e) => return Err(e),
281        };
282        let requests = &mut *self.requests.lock().unwrap();
283        let futures = &mut *self.futures.lock().unwrap();
284        if let Some(callback) = requests.remove(&req_id.sequence_number) {
285            callback(res);
286        } else if let Some(future) = futures.remove(&req_id.sequence_number) {
287            let _ = future.send(res);
288        }
289        Ok(())
290    }
291}