do_not_use_testing_rclrs/wait.rs
1// Copyright 2020 DCS Corporation, All Rights Reserved.
2
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6
7// http://www.apache.org/licenses/LICENSE-2.0
8
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// DISTRIBUTION A. Approved for public release; distribution unlimited.
16// OPSEC #4584.
17
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20use std::vec::Vec;
21
22use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult};
23use crate::rcl_bindings::*;
24use crate::{ClientBase, Context, Node, ServiceBase, SubscriptionBase};
25
26mod exclusivity_guard;
27mod guard_condition;
28use exclusivity_guard::*;
29pub use guard_condition::*;
30
31/// A struct for waiting on subscriptions and other waitable entities to become ready.
32pub struct WaitSet {
33 rcl_wait_set: rcl_wait_set_t,
34 // Used to ensure the context is alive while the wait set is alive.
35 _rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
36 // The subscriptions that are currently registered in the wait set.
37 // This correspondence is an invariant that must be maintained by all functions,
38 // even in the error case.
39 subscriptions: Vec<ExclusivityGuard<Arc<dyn SubscriptionBase>>>,
40 clients: Vec<ExclusivityGuard<Arc<dyn ClientBase>>>,
41 // The guard conditions that are currently registered in the wait set.
42 guard_conditions: Vec<ExclusivityGuard<Arc<GuardCondition>>>,
43 services: Vec<ExclusivityGuard<Arc<dyn ServiceBase>>>,
44}
45
46/// A list of entities that are ready, returned by [`WaitSet::wait`].
47pub struct ReadyEntities {
48 /// A list of subscriptions that have potentially received messages.
49 pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
50 /// A list of clients that have potentially received responses.
51 pub clients: Vec<Arc<dyn ClientBase>>,
52 /// A list of guard conditions that have been triggered.
53 pub guard_conditions: Vec<Arc<GuardCondition>>,
54 /// A list of services that have potentially received requests.
55 pub services: Vec<Arc<dyn ServiceBase>>,
56}
57
58impl Drop for rcl_wait_set_t {
59 fn drop(&mut self) {
60 // SAFETY: No preconditions for this function (besides passing in a valid wait set).
61 let rc = unsafe { rcl_wait_set_fini(self) };
62 if let Err(e) = to_rclrs_result(rc) {
63 panic!("Unable to release WaitSet. {:?}", e)
64 }
65 }
66}
67
68// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
69// they are running in. Therefore, this type can be safely sent to another thread.
70unsafe impl Send for rcl_wait_set_t {}
71
72// SAFETY: While the rcl_wait_set_t does have some interior mutability (because it has
73// members of non-const pointer type), this interior mutability is hidden/not used by
74// the WaitSet type. Therefore, sharing &WaitSet between threads does not risk data races.
75unsafe impl Sync for WaitSet {}
76
77impl WaitSet {
78 /// Creates a new wait set.
79 ///
80 /// The given number of subscriptions is a capacity, corresponding to how often
81 /// [`WaitSet::add_subscription`] may be called.
82 pub fn new(
83 number_of_subscriptions: usize,
84 number_of_guard_conditions: usize,
85 number_of_timers: usize,
86 number_of_clients: usize,
87 number_of_services: usize,
88 number_of_events: usize,
89 context: &Context,
90 ) -> Result<Self, RclrsError> {
91 let rcl_wait_set = unsafe {
92 // SAFETY: Getting a zero-initialized value is always safe
93 let mut rcl_wait_set = rcl_get_zero_initialized_wait_set();
94 // SAFETY: We're passing in a zero-initialized wait set and a valid context.
95 // There are no other preconditions.
96 rcl_wait_set_init(
97 &mut rcl_wait_set,
98 number_of_subscriptions,
99 number_of_guard_conditions,
100 number_of_timers,
101 number_of_clients,
102 number_of_services,
103 number_of_events,
104 &mut *context.rcl_context_mtx.lock().unwrap(),
105 rcutils_get_default_allocator(),
106 )
107 .ok()?;
108 rcl_wait_set
109 };
110 Ok(Self {
111 rcl_wait_set,
112 _rcl_context_mtx: context.rcl_context_mtx.clone(),
113 subscriptions: Vec::new(),
114 guard_conditions: Vec::new(),
115 clients: Vec::new(),
116 services: Vec::new(),
117 })
118 }
119
120 /// Creates a new wait set and adds all waitable entities in the node to it.
121 ///
122 /// The wait set is sized to fit the node exactly, so there is no capacity for adding other entities.
123 pub fn new_for_node(node: &Node) -> Result<Self, RclrsError> {
124 let live_subscriptions = node.live_subscriptions();
125 let live_clients = node.live_clients();
126 let live_guard_conditions = node.live_guard_conditions();
127 let live_services = node.live_services();
128 let ctx = Context {
129 rcl_context_mtx: node.rcl_context_mtx.clone(),
130 };
131 let mut wait_set = WaitSet::new(
132 live_subscriptions.len(),
133 live_guard_conditions.len(),
134 0,
135 live_clients.len(),
136 live_services.len(),
137 0,
138 &ctx,
139 )?;
140
141 for live_subscription in &live_subscriptions {
142 wait_set.add_subscription(live_subscription.clone())?;
143 }
144
145 for live_client in &live_clients {
146 wait_set.add_client(live_client.clone())?;
147 }
148
149 for live_guard_condition in &live_guard_conditions {
150 wait_set.add_guard_condition(live_guard_condition.clone())?;
151 }
152
153 for live_service in &live_services {
154 wait_set.add_service(live_service.clone())?;
155 }
156 Ok(wait_set)
157 }
158
159 /// Removes all entities from the wait set.
160 ///
161 /// This effectively resets the wait set to the state it was in after being created by
162 /// [`WaitSet::new`].
163 pub fn clear(&mut self) {
164 self.subscriptions.clear();
165 self.guard_conditions.clear();
166 self.clients.clear();
167 self.services.clear();
168 // This cannot fail – the rcl_wait_set_clear function only checks that the input handle is
169 // valid, which it always is in our case. Hence, only debug_assert instead of returning
170 // Result.
171 // SAFETY: No preconditions for this function (besides passing in a valid wait set).
172 let ret = unsafe { rcl_wait_set_clear(&mut self.rcl_wait_set) };
173 debug_assert_eq!(ret, 0);
174 }
175
176 /// Adds a subscription to the wait set.
177 ///
178 /// # Errors
179 /// - If the subscription was already added to this wait set or another one,
180 /// [`AlreadyAddedToWaitSet`][1] will be returned
181 /// - If the number of subscriptions in the wait set is larger than the
182 /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
183 ///
184 /// [1]: crate::RclrsError
185 /// [2]: crate::RclReturnCode
186 pub fn add_subscription(
187 &mut self,
188 subscription: Arc<dyn SubscriptionBase>,
189 ) -> Result<(), RclrsError> {
190 let exclusive_subscription = ExclusivityGuard::new(
191 Arc::clone(&subscription),
192 Arc::clone(&subscription.handle().in_use_by_wait_set),
193 )?;
194 unsafe {
195 // SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
196 // for as long as the wait set exists, because it's stored in self.subscriptions.
197 // Passing in a null pointer for the third argument is explicitly allowed.
198 rcl_wait_set_add_subscription(
199 &mut self.rcl_wait_set,
200 &*subscription.handle().lock(),
201 std::ptr::null_mut(),
202 )
203 }
204 .ok()?;
205 self.subscriptions.push(exclusive_subscription);
206 Ok(())
207 }
208
209 /// Adds a guard condition to the wait set.
210 ///
211 /// # Errors
212 /// - If the guard condition was already added to this wait set or another one,
213 /// [`AlreadyAddedToWaitSet`][1] will be returned
214 /// - If the number of guard conditions in the wait set is larger than the
215 /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
216 ///
217 /// [1]: crate::RclrsError
218 /// [2]: crate::RclReturnCode
219 pub fn add_guard_condition(
220 &mut self,
221 guard_condition: Arc<GuardCondition>,
222 ) -> Result<(), RclrsError> {
223 let exclusive_guard_condition = ExclusivityGuard::new(
224 Arc::clone(&guard_condition),
225 Arc::clone(&guard_condition.in_use_by_wait_set),
226 )?;
227
228 unsafe {
229 // SAFETY: Safe if the wait set and guard condition are initialized
230 rcl_wait_set_add_guard_condition(
231 &mut self.rcl_wait_set,
232 &*guard_condition.rcl_guard_condition.lock().unwrap(),
233 std::ptr::null_mut(),
234 )
235 .ok()?;
236 }
237 self.guard_conditions.push(exclusive_guard_condition);
238 Ok(())
239 }
240
241 /// Adds a client to the wait set.
242 ///
243 /// # Errors
244 /// - If the client was already added to this wait set or another one,
245 /// [`AlreadyAddedToWaitSet`][1] will be returned
246 /// - If the number of clients in the wait set is larger than the
247 /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
248 ///
249 /// [1]: crate::RclrsError
250 /// [2]: crate::RclReturnCode
251 pub fn add_client(&mut self, client: Arc<dyn ClientBase>) -> Result<(), RclrsError> {
252 let exclusive_client = ExclusivityGuard::new(
253 Arc::clone(&client),
254 Arc::clone(&client.handle().in_use_by_wait_set),
255 )?;
256 unsafe {
257 // SAFETY: I'm not sure if it's required, but the client pointer will remain valid
258 // for as long as the wait set exists, because it's stored in self.clients.
259 // Passing in a null pointer for the third argument is explicitly allowed.
260 rcl_wait_set_add_client(
261 &mut self.rcl_wait_set,
262 &*client.handle().lock() as *const _,
263 core::ptr::null_mut(),
264 )
265 }
266 .ok()?;
267 self.clients.push(exclusive_client);
268 Ok(())
269 }
270
271 /// Adds a service to the wait set.
272 ///
273 /// # Errors
274 /// - If the service was already added to this wait set or another one,
275 /// [`AlreadyAddedToWaitSet`][1] will be returned
276 /// - If the number of services in the wait set is larger than the
277 /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned
278 ///
279 /// [1]: crate::RclrsError
280 /// [2]: crate::RclReturnCode
281 pub fn add_service(&mut self, service: Arc<dyn ServiceBase>) -> Result<(), RclrsError> {
282 let exclusive_service = ExclusivityGuard::new(
283 Arc::clone(&service),
284 Arc::clone(&service.handle().in_use_by_wait_set),
285 )?;
286 unsafe {
287 // SAFETY: I'm not sure if it's required, but the service pointer will remain valid
288 // for as long as the wait set exists, because it's stored in self.services.
289 // Passing in a null pointer for the third argument is explicitly allowed.
290 rcl_wait_set_add_service(
291 &mut self.rcl_wait_set,
292 &*service.handle().lock() as *const _,
293 core::ptr::null_mut(),
294 )
295 }
296 .ok()?;
297 self.services.push(exclusive_service);
298 Ok(())
299 }
300
301 /// Blocks until the wait set is ready, or until the timeout has been exceeded.
302 ///
303 /// If the timeout is `None` then this function will block indefinitely until
304 /// something in the wait set is valid or it is interrupted.
305 ///
306 /// If the timeout is [`Duration::ZERO`][1] then this function will be non-blocking; checking what's
307 /// ready now, but not waiting if nothing is ready yet.
308 ///
309 /// If the timeout is greater than [`Duration::ZERO`][1] then this function will return after
310 /// that period of time has elapsed or the wait set becomes ready, which ever
311 /// comes first.
312 ///
313 /// This function does not change the entities registered in the wait set.
314 ///
315 /// # Errors
316 ///
317 /// - Passing a wait set with no wait-able items in it will return an error.
318 /// - The timeout must not be so large so as to overflow an `i64` with its nanosecond
319 /// representation, or an error will occur.
320 ///
321 /// This list is not comprehensive, since further errors may occur in the `rmw` or `rcl` layers.
322 ///
323 /// [1]: std::time::Duration::ZERO
324 pub fn wait(mut self, timeout: Option<Duration>) -> Result<ReadyEntities, RclrsError> {
325 let timeout_ns = match timeout.map(|d| d.as_nanos()) {
326 None => -1,
327 Some(ns) if ns <= i64::MAX as u128 => ns as i64,
328 _ => {
329 return Err(RclrsError::RclError {
330 code: RclReturnCode::InvalidArgument,
331 msg: None,
332 })
333 }
334 };
335 // SAFETY: The comments in rcl mention "This function cannot operate on the same wait set
336 // in multiple threads, and the wait sets may not share content."
337 // We cannot currently guarantee that the wait sets may not share content, but it is
338 // mentioned in the doc comment for `add_subscription`.
339 // Also, the rcl_wait_set is obviously valid.
340 match unsafe { rcl_wait(&mut self.rcl_wait_set, timeout_ns) }.ok() {
341 Ok(_) => (),
342 Err(error) => match error {
343 RclrsError::RclError { code, msg } => match code {
344 RclReturnCode::WaitSetEmpty => (),
345 _ => return Err(RclrsError::RclError { code, msg }),
346 },
347 _ => return Err(error),
348 },
349 }
350 let mut ready_entities = ReadyEntities {
351 subscriptions: Vec::new(),
352 clients: Vec::new(),
353 guard_conditions: Vec::new(),
354 services: Vec::new(),
355 };
356 for (i, subscription) in self.subscriptions.iter().enumerate() {
357 // SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is
358 // equivalent to
359 // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
360 let wait_set_entry = unsafe { *self.rcl_wait_set.subscriptions.add(i) };
361 if !wait_set_entry.is_null() {
362 ready_entities
363 .subscriptions
364 .push(Arc::clone(&subscription.waitable));
365 }
366 }
367
368 for (i, client) in self.clients.iter().enumerate() {
369 // SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
370 // equivalent to
371 // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
372 let wait_set_entry = unsafe { *self.rcl_wait_set.clients.add(i) };
373 if !wait_set_entry.is_null() {
374 ready_entities.clients.push(Arc::clone(&client.waitable));
375 }
376 }
377
378 for (i, guard_condition) in self.guard_conditions.iter().enumerate() {
379 // SAFETY: The `clients` entry is an array of pointers, and this dereferencing is
380 // equivalent to
381 // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
382 let wait_set_entry = unsafe { *self.rcl_wait_set.guard_conditions.add(i) };
383 if !wait_set_entry.is_null() {
384 ready_entities
385 .guard_conditions
386 .push(Arc::clone(&guard_condition.waitable));
387 }
388 }
389
390 for (i, service) in self.services.iter().enumerate() {
391 // SAFETY: The `services` entry is an array of pointers, and this dereferencing is
392 // equivalent to
393 // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419
394 let wait_set_entry = unsafe { *self.rcl_wait_set.services.add(i) };
395 if !wait_set_entry.is_null() {
396 ready_entities.services.push(Arc::clone(&service.waitable));
397 }
398 }
399 Ok(ready_entities)
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 fn assert_send<T: Send>() {}
408 fn assert_sync<T: Sync>() {}
409
410 #[test]
411 fn wait_set_is_send_and_sync() {
412 assert_send::<WaitSet>();
413 assert_sync::<WaitSet>();
414 }
415
416 #[test]
417 fn guard_condition_in_wait_set_readies() -> Result<(), RclrsError> {
418 let context = Context::new([])?;
419
420 let guard_condition = Arc::new(GuardCondition::new(&context));
421
422 let mut wait_set = WaitSet::new(0, 1, 0, 0, 0, 0, &context)?;
423 wait_set.add_guard_condition(Arc::clone(&guard_condition))?;
424 guard_condition.trigger()?;
425
426 let readies = wait_set.wait(Some(std::time::Duration::from_millis(10)))?;
427 assert!(readies.guard_conditions.contains(&guard_condition));
428
429 Ok(())
430 }
431}